On Mon, 4 May 2020 at 23:58, Rion Williams <rionmons...@gmail.com> wrote:

> Hi Luke,
>
> Thanks for the detailed response, it sounds like an avenue that I'd like
> to explore a bit further, although forgive me as I'm still quite new to
> Beam in general. I haven't written any stateful DoFns previously but I'd
> imagine it'd look something like this (what you are proposing that is):
>
> ```
> val pipeline = Pipeline.create(options)
>
> // Users
> val users = pipeline.apply("Read Users from Kafka",
> KafkaIO.read<User>(options.usersTopic, options))
>
> // Additional entities omitted for brevity here
>
> pipeline
>     .apply("Read Events from Kafka",
> KafkaIO.read<Event>(options.identifiedEventsTopic, options))
>     .apply(Window.into(/* Unsure what to put here? */ ))
>     .apply("Enrich Event for Users", ParDo.of(
>             object: DoFn<KV<String, Event>, KV<String, Event>>(){
>                 @StateId("user")
>                 private val state:
> SomeObjectToStoreStateAboutUsersAndOrEvents()
>
>                 @ProcessElement()
>                 fun processElement(context: ProcessContext,
> @StateId("user") userStateObject:
> SomeObjectToStoreStateAboutUsersAndOrEvents) {
>                     // Evaluate the incoming event
>
>                     // Enrich if we have that user
>
>                     // If enriched then output
>
>                     // Otherwise, store in state
>                 }
>             }
>     ))
>     .apply("Eventually Write Enriched Events to Kafka",
> KafkaIO.write<Event>(options.enrichedEventsTopic, options))
> ```
>
>
Presented stateful DoFns is oversimplified. The state for users is needed
but also state for events if the related user has not been observed yet (or
you could use window for events before stateful down).
Without window (e.g. in global window) the stateful DoFn requires some
trigger to clean the state as well.

Please look at this example:
https://github.com/mkuthan/beam-examples/blob/master/src/main/scala/org/mkuthan/beam/examples/ScreenGlobalWindowWithLookupCacheEnricher.scala

Stream with screen events is similar to your event stream, the publication
stream is similar to user stream. Screen and publication are joined in
global window (with count at least one trigger) using stateful DoFn:
https://github.com/mkuthan/beam-examples/blob/master/src/main/scala/org/mkuthan/beam/examples/LookupCacheDoFn.scala

I hope this helps :)


I'm not totally sure on the windowing usage yet or how/when the other
> streams come into play, so any advice there would be useful. Additionally -
> I have a few other questions if you have the time:
>
> - In this particular pipeline, users is a single entity, however I may
> potentially have multiple others. I'm assuming this would just require an
> additional stateful function per entity?
>

Look at mentioned LookupCacheDoFn, the implementation is fully generic and
might be reused for different entities. For sure, separate stateful DoFn
instance for each entity type is required.


> - Some events may contain multiple user instances and thus require to be
> enriched multiple times from a single source. Is this a problem using this
> approach?
>
- The current plan for this in a production environment would be to rely on
> Flink. I noticed that Flink has "partial" support for handling state, would
> this fall under that supported umbrella?
>
> Thanks so much for the advice Luke, I greatly appreciate it. Sorry for
> such a long winded question, I'm really excited about working with Beam,
> but the learning curve has been pretty steep (coming from an all-Kafka
> Kafka Streams world) thus far.
>
> Rion
>
> On 2020/05/04 16:11:59, Luke Cwik <lc...@google.com> wrote:
> > You can shard the side input based upon some prefix of the key, (e.g
> first
> > byte of the key) into X shards allowing each side input to be smaller for
> > runners that don't work well with map/multimap side inputs. You should
> also
> > take a look at the side input patterns[1] since they cover slowly
> changing
> > side inputs and I believe your use case has come up in the past so going
> > through these mailing threads[2] might help as well.
> >
> > Finally, you could also use a stateful DoFn instead of the side input.
> >
> > The stateful DoFn graph would create a single "object" type that
> contained
> > either an "update" message or a "needsEnrichment" message. Something
> like:
> > MainKafkaStream -------> Flatten -> Window.into(???) ->
> > StatefulDoFn(EnrichFn) -> ...
> > AdditionalInfoStream -/
> >
> > You need to ensure that the key you use for the stateful DoFn is the same
> > key for both the "update" and "needsEnrichment" message that you would
> join
> > on. This might require multiple enrichment fns if there isn't a single
> key
> > you can join on.
> >
> > The Window.into that is before the stateful DoFn would control when
> > "events" are allowed to progress. If you want to have them "synchronized"
> > on event time then you could use the default event time trigger which
> would
> > mean that update messages wouldn't be allowed to fall behind in
> processing
> > when compared to the needsEnrichment messages (you might want to look
> into
> > @RequiresTimeSortedInput here as well). You could also use an after
> count 1
> > trigger that would allow both streams to go through the EnrichFn without
> > one blocking on the other meaning that the needsEnrichment messages might
> > get "stale" data.
> >
> > Using the stateful DoFn would mean that you would only be
> > retrieving/writing as much data that is ever associated with the key that
> > you use and would have good parallelism if you have a lot of keys.
> >
> > 1: https://beam.apache.org/documentation/patterns/side-inputs/
> > 2:
> >
> https://lists.apache.org/list.html?user@beam.apache.org:lte=99M:slowly+changing+side+inputs
> > 3:
> >
> https://lists.apache.org/list.html?d...@beam.apache.org:lte=99M:slowly+changing+side+inputs
> >
> >
> > On Sun, May 3, 2020 at 1:00 PM Rion Williams <rionmons...@gmail.com>
> wrote:
> >
> > > Hi all,
> > >
> > > I'm in the process of migrating an existing pipeline over from the
> Kafka
> > > Streams framework to Apache Beam and I'm reaching out in hopes of
> finding a
> > > pattern that could address the use-case I'm currently dealing with.
> > >
> > > In the most trivial sense, the scenario just involves an incoming
> message
> > > (an event) within the pipeline that may contain multiple different
> entities
> > > (users, endpoints, ip addresses, etc.) and I'm trying to take the
> > > identifiers from each of those entities and enrich the event from a
> source
> > > (Kafka) that has more information on them.
> > >
> > > The workflow might go something like this:
> > >  - Read in incoming event containing a single user entity from a Kafka
> > > topic
> > >  - Perform the equivalent of a LEFT JOIN on the user identifier against
> > > another Kafka topic that contains additional user information
> > >  - If the join succeeds, mutate the existing user instance on the event
> > > with any additional information from the source topic
> > >  - Write the "enriched" event to a destination (e.g. Kafka, Elastic,
> etc.)
> > >
> > > What would be the best pattern to try and tackle this problem? I know
> that
> > > side inputs are an option, however the source user topic could
> potentially
> > > contain millions of distinct users (with any "new" users being added to
> > > that source topic in near-real time, albeit upstream from this
> process).
> > >
> > > Any information or recommendations would be greatly appreciated!
> > >
> > > Thanks,
> > >
> > > Rion
> > >
> > >
> >
>

Reply via email to