You can shard the side input based upon some prefix of the key, (e.g first byte of the key) into X shards allowing each side input to be smaller for runners that don't work well with map/multimap side inputs. You should also take a look at the side input patterns[1] since they cover slowly changing side inputs and I believe your use case has come up in the past so going through these mailing threads[2] might help as well.
Finally, you could also use a stateful DoFn instead of the side input. The stateful DoFn graph would create a single "object" type that contained either an "update" message or a "needsEnrichment" message. Something like: MainKafkaStream -------> Flatten -> Window.into(???) -> StatefulDoFn(EnrichFn) -> ... AdditionalInfoStream -/ You need to ensure that the key you use for the stateful DoFn is the same key for both the "update" and "needsEnrichment" message that you would join on. This might require multiple enrichment fns if there isn't a single key you can join on. The Window.into that is before the stateful DoFn would control when "events" are allowed to progress. If you want to have them "synchronized" on event time then you could use the default event time trigger which would mean that update messages wouldn't be allowed to fall behind in processing when compared to the needsEnrichment messages (you might want to look into @RequiresTimeSortedInput here as well). You could also use an after count 1 trigger that would allow both streams to go through the EnrichFn without one blocking on the other meaning that the needsEnrichment messages might get "stale" data. Using the stateful DoFn would mean that you would only be retrieving/writing as much data that is ever associated with the key that you use and would have good parallelism if you have a lot of keys. 1: https://beam.apache.org/documentation/patterns/side-inputs/ 2: https://lists.apache.org/list.html?user@beam.apache.org:lte=99M:slowly+changing+side+inputs 3: https://lists.apache.org/list.html?d...@beam.apache.org:lte=99M:slowly+changing+side+inputs On Sun, May 3, 2020 at 1:00 PM Rion Williams <rionmons...@gmail.com> wrote: > Hi all, > > I'm in the process of migrating an existing pipeline over from the Kafka > Streams framework to Apache Beam and I'm reaching out in hopes of finding a > pattern that could address the use-case I'm currently dealing with. > > In the most trivial sense, the scenario just involves an incoming message > (an event) within the pipeline that may contain multiple different entities > (users, endpoints, ip addresses, etc.) and I'm trying to take the > identifiers from each of those entities and enrich the event from a source > (Kafka) that has more information on them. > > The workflow might go something like this: > - Read in incoming event containing a single user entity from a Kafka > topic > - Perform the equivalent of a LEFT JOIN on the user identifier against > another Kafka topic that contains additional user information > - If the join succeeds, mutate the existing user instance on the event > with any additional information from the source topic > - Write the "enriched" event to a destination (e.g. Kafka, Elastic, etc.) > > What would be the best pattern to try and tackle this problem? I know that > side inputs are an option, however the source user topic could potentially > contain millions of distinct users (with any "new" users being added to > that source topic in near-real time, albeit upstream from this process). > > Any information or recommendations would be greatly appreciated! > > Thanks, > > Rion > >