Dear all,

A reoccurring challenge we have with stream enrichment in Flink is a robust
mechanism to estimate that all messages of the source(s) have been
consumed/processed before output is collected.

A simple example is two sources of catalogue metadata:
- source A delivers products,
- source B delivers product categories,

For a process function to enrich the categories with the number of products
in each category, we would do a KeyedCoProcessFunction (or a
RichCoFlatMap), keyed by category ID, and put both the category and
products in state. Then count all products for each keyed state and collect
the result.

Typically, however, we don't want to start counting before all products are
included in state (to avoid emitting incomplete aggregations downstream).
Therefore we use the event lag time (i.e. processing time - current
watermark) to indicate "ingest mode" of the processor (e.g. lag time > 30
seconds). When in "ingest mode" we will trigger a timer, and return without
collecting. Finally, the timer fires when the watermark has advanced
sufficiently.

This strategy of "ingest mode" (and timers) seems to be more complicated
when you have multiple process functions (with the same need of ingest
mode) downstream of the first one processor. The reason seems to be that
watermarks are passed from the first process function even though no
elements are collected. Therefore, when elements finally arrive at the
second process function, the current watermark has already advanced, so the
same strategy of watermarks is less robust.

I'm curious how others in the community handle this "challenge" of initial
ingest. Any ideas are greatly appreciated.

Note: we use a custom watermark generator that emits watermarks derived
from event time, and advances the watermarks when the source is idle for a
longer period (e.g. 30 seconds).

Thanks !

L

Reply via email to