Hello, You’re right, one of our main use cases consist of adding missing fields, stored in a “small” reference table, periodically refreshed, to a stream. Using a broadcast stream and flink join was not the choice we made, because we didn’t want to add tricky watermarks and hold one stream (it may build a huge state using a window, and you don’t always have control on the source function to wait before emitting) until everything is broadcasted.
So, we developed tools that load a static RAM hashmap cache from the reference table in the open() method of our enrichment operator, without using flink streams, and launch a thread to periodically refresh the hashmap. We also use the same hashing mechanism as flink to load on each task manager only the part of the table which is relevant to the keyed stream. IMHO this stuff should be part of the framework, it‘s easier to do with Spark Streaming… :-) Best regards, Arnaud De : Lars Skjærven <lar...@gmail.com> Envoyé : mercredi 14 février 2024 08:12 À : user <user@flink.apache.org> Objet : Stream enrichment with ingest mode Dear all, A reoccurring challenge we have with stream enrichment in Flink is a robust mechanism to estimate that all messages of the source(s) have been consumed/processed before output is collected. A simple example is two sources of catalogue metadata: - source A delivers products, - source B delivers product categories, For a process function to enrich the categories with the number of products in each category, we would do a KeyedCoProcessFunction (or a RichCoFlatMap), keyed by category ID, and put both the category and products in state. Then count all products for each keyed state and collect the result. Typically, however, we don't want to start counting before all products are included in state (to avoid emitting incomplete aggregations downstream). Therefore we use the event lag time (i.e. processing time - current watermark) to indicate "ingest mode" of the processor (e.g. lag time > 30 seconds). When in "ingest mode" we will trigger a timer, and return without collecting. Finally, the timer fires when the watermark has advanced sufficiently. This strategy of "ingest mode" (and timers) seems to be more complicated when you have multiple process functions (with the same need of ingest mode) downstream of the first one processor. The reason seems to be that watermarks are passed from the first process function even though no elements are collected. Therefore, when elements finally arrive at the second process function, the current watermark has already advanced, so the same strategy of watermarks is less robust. I'm curious how others in the community handle this "challenge" of initial ingest. Any ideas are greatly appreciated. Note: we use a custom watermark generator that emits watermarks derived from event time, and advances the watermarks when the source is idle for a longer period (e.g. 30 seconds). Thanks ! L ________________________________ L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.