Dear all, A reoccurring challenge we have with stream enrichment in Flink is a robust mechanism to estimate that all messages of the source(s) have been consumed/processed before output is collected.
A simple example is two sources of catalogue metadata: - source A delivers products, - source B delivers product categories, For a process function to enrich the categories with the number of products in each category, we would do a KeyedCoProcessFunction (or a RichCoFlatMap), keyed by category ID, and put both the category and products in state. Then count all products for each keyed state and collect the result. Typically, however, we don't want to start counting before all products are included in state (to avoid emitting incomplete aggregations downstream). Therefore we use the event lag time (i.e. processing time - current watermark) to indicate "ingest mode" of the processor (e.g. lag time > 30 seconds). When in "ingest mode" we will trigger a timer, and return without collecting. Finally, the timer fires when the watermark has advanced sufficiently. This strategy of "ingest mode" (and timers) seems to be more complicated when you have multiple process functions (with the same need of ingest mode) downstream of the first one processor. The reason seems to be that watermarks are passed from the first process function even though no elements are collected. Therefore, when elements finally arrive at the second process function, the current watermark has already advanced, so the same strategy of watermarks is less robust. I'm curious how others in the community handle this "challenge" of initial ingest. Any ideas are greatly appreciated. Note: we use a custom watermark generator that emits watermarks derived from event time, and advances the watermarks when the source is idle for a longer period (e.g. 30 seconds). Thanks ! L