No problem, I totally understand and can relate. As mentioned I think you 
pointed me in the right direction and it makes sense on paper to me, I’m just 
not sure how to connect some of the dots syntactically.

Greatly appreciate it!

> On May 4, 2020, at 5:11 PM, Luke Cwik <lc...@google.com> wrote:
> 
> 
> I have a lot of work stuff going on so I'll try to provide a response but it 
> might take days. Also, if you find an answer to one of your questions or have 
> additional questions while you investigate, feel free to update this thread.
> 
>> On Mon, May 4, 2020 at 2:58 PM Rion Williams <rionmons...@gmail.com> wrote:
>> Hi Luke,
>> 
>> Thanks for the detailed response, it sounds like an avenue that I'd like to 
>> explore a bit further, although forgive me as I'm still quite new to Beam in 
>> general. I haven't written any stateful DoFns previously but I'd imagine 
>> it'd look something like this (what you are proposing that is):
>> 
>> ```
>> val pipeline = Pipeline.create(options)
>> 
>> // Users
>> val users = pipeline.apply("Read Users from Kafka", 
>> KafkaIO.read<User>(options.usersTopic, options))
>> 
>> // Additional entities omitted for brevity here
>> 
>> pipeline
>>     .apply("Read Events from Kafka", 
>> KafkaIO.read<Event>(options.identifiedEventsTopic, options))
>>     .apply(Window.into(/* Unsure what to put here? */ ))
>>     .apply("Enrich Event for Users", ParDo.of(
>>             object: DoFn<KV<String, Event>, KV<String, Event>>(){
>>                 @StateId("user")
>>                 private val state: 
>> SomeObjectToStoreStateAboutUsersAndOrEvents()
>> 
>>                 @ProcessElement()
>>                 fun processElement(context: ProcessContext, @StateId("user") 
>> userStateObject: SomeObjectToStoreStateAboutUsersAndOrEvents) {
>>                     // Evaluate the incoming event
>> 
>>                     // Enrich if we have that user
>> 
>>                     // If enriched then output
>> 
>>                     // Otherwise, store in state
>>                 }
>>             }
>>     ))
>>     .apply("Eventually Write Enriched Events to Kafka", 
>> KafkaIO.write<Event>(options.enrichedEventsTopic, options))
>> ```
>> 
>> I'm not totally sure on the windowing usage yet or how/when the other 
>> streams come into play, so any advice there would be useful. Additionally - 
>> I have a few other questions if you have the time:
>> 
>> - In this particular pipeline, users is a single entity, however I may 
>> potentially have multiple others. I'm assuming this would just require an 
>> additional stateful function per entity?
>> - Some events may contain multiple user instances and thus require to be 
>> enriched multiple times from a single source. Is this a problem using this 
>> approach?
>> - The current plan for this in a production environment would be to rely on 
>> Flink. I noticed that Flink has "partial" support for handling state, would 
>> this fall under that supported umbrella?
>> 
>> Thanks so much for the advice Luke, I greatly appreciate it. Sorry for such 
>> a long winded question, I'm really excited about working with Beam, but the 
>> learning curve has been pretty steep (coming from an all-Kafka Kafka Streams 
>> world) thus far.
>> 
>> Rion
>> 
>> On 2020/05/04 16:11:59, Luke Cwik <lc...@google.com> wrote: 
>> > You can shard the side input based upon some prefix of the key, (e.g first
>> > byte of the key) into X shards allowing each side input to be smaller for
>> > runners that don't work well with map/multimap side inputs. You should also
>> > take a look at the side input patterns[1] since they cover slowly changing
>> > side inputs and I believe your use case has come up in the past so going
>> > through these mailing threads[2] might help as well.
>> > 
>> > Finally, you could also use a stateful DoFn instead of the side input.
>> > 
>> > The stateful DoFn graph would create a single "object" type that contained
>> > either an "update" message or a "needsEnrichment" message. Something like:
>> > MainKafkaStream -------> Flatten -> Window.into(???) ->
>> > StatefulDoFn(EnrichFn) -> ...
>> > AdditionalInfoStream -/
>> > 
>> > You need to ensure that the key you use for the stateful DoFn is the same
>> > key for both the "update" and "needsEnrichment" message that you would join
>> > on. This might require multiple enrichment fns if there isn't a single key
>> > you can join on.
>> > 
>> > The Window.into that is before the stateful DoFn would control when
>> > "events" are allowed to progress. If you want to have them "synchronized"
>> > on event time then you could use the default event time trigger which would
>> > mean that update messages wouldn't be allowed to fall behind in processing
>> > when compared to the needsEnrichment messages (you might want to look into
>> > @RequiresTimeSortedInput here as well). You could also use an after count 1
>> > trigger that would allow both streams to go through the EnrichFn without
>> > one blocking on the other meaning that the needsEnrichment messages might
>> > get "stale" data.
>> > 
>> > Using the stateful DoFn would mean that you would only be
>> > retrieving/writing as much data that is ever associated with the key that
>> > you use and would have good parallelism if you have a lot of keys.
>> > 
>> > 1: https://beam.apache.org/documentation/patterns/side-inputs/
>> > 2:
>> > https://lists.apache.org/list.html?user@beam.apache.org:lte=99M:slowly+changing+side+inputs
>> > 3:
>> > https://lists.apache.org/list.html?d...@beam.apache.org:lte=99M:slowly+changing+side+inputs
>> > 
>> > 
>> > On Sun, May 3, 2020 at 1:00 PM Rion Williams <rionmons...@gmail.com> wrote:
>> > 
>> > > Hi all,
>> > >
>> > > I'm in the process of migrating an existing pipeline over from the Kafka
>> > > Streams framework to Apache Beam and I'm reaching out in hopes of 
>> > > finding a
>> > > pattern that could address the use-case I'm currently dealing with.
>> > >
>> > > In the most trivial sense, the scenario just involves an incoming message
>> > > (an event) within the pipeline that may contain multiple different 
>> > > entities
>> > > (users, endpoints, ip addresses, etc.) and I'm trying to take the
>> > > identifiers from each of those entities and enrich the event from a 
>> > > source
>> > > (Kafka) that has more information on them.
>> > >
>> > > The workflow might go something like this:
>> > >  - Read in incoming event containing a single user entity from a Kafka
>> > > topic
>> > >  - Perform the equivalent of a LEFT JOIN on the user identifier against
>> > > another Kafka topic that contains additional user information
>> > >  - If the join succeeds, mutate the existing user instance on the event
>> > > with any additional information from the source topic
>> > >  - Write the "enriched" event to a destination (e.g. Kafka, Elastic, 
>> > > etc.)
>> > >
>> > > What would be the best pattern to try and tackle this problem? I know 
>> > > that
>> > > side inputs are an option, however the source user topic could 
>> > > potentially
>> > > contain millions of distinct users (with any "new" users being added to
>> > > that source topic in near-real time, albeit upstream from this process).
>> > >
>> > > Any information or recommendations would be greatly appreciated!
>> > >
>> > > Thanks,
>> > >
>> > > Rion
>> > >
>> > >
>> > 

Reply via email to