Hi Alexis, Thanks a lot for your reply & guidance, which makes a lot of sense to me overall.
Regards, Salva On Thu, Dec 8, 2022 at 5:34 PM Alexis Sarda-Espinosa < sarda.espin...@gmail.com> wrote: > Hi Salva, > > Just to give you further thoughts from another user, I think the "temporal > join" semantics are very critical in this use case, and what you implement > for that may not easily generalize to other cases. Because of that, I'm not > sure if you can really define best practices that apply in general. > Additionally, you also have to take idleness into account, given that using > event-time could leave you in a "frozen" state if you're not receiving > events continuously. > > I also doubt you can accurately estimate out-of-orderness in this scenario > due to the particularities of Flink's network stack [1]. Even if you only > have 2 sources and immediately connect them together, parallelism and the > resulting shuffles can change from one execution to the other even if you > don't change the logic at all, because scheduling is also non-deterministic > and the "distribution" of events across different parallel instances of > your sources could vary a lot as well. > > I think that others will tell you that you indeed need to find a way to > buffer events for a while, I got the same advice in the past. Focusing very > specifically on what you described (streams for data & control events + > event-time + temporal join), but maybe also assuming you can manage > watermarks in a way that handles idleness without simply freezing the > stream, I once tried a custom operator (not function) to force the > watermarks of 1 stream to wait for the other one - > see PrioritizedWatermarkStreamIntervalJoinOperator in [2]. This still uses > the idea of buffering, it just moves that responsibility to the operator > that's already handling "window data" for the join. Also, it extends an > internal class, so it's very much unofficial, and it's probably too > specific to my use case, but maybe it gives you other ideas to consider. > > And one last detail to further exemplify complexity here: when I was > testing my custom operator with event-time simulations in my IDE, I > initially didn't think about the fact that a watermark with Long.MAX_VALUE > is sent at the end of the simulation, which was another source of > non-determinism because sometimes the control stream was processed entirely > (including the max watermark) before a single event from the data stream > went through, which meant that all events from the data stream were > considered late arrivals and silently dropped. > > [1] https://flink.apache.org/2019/06/05/flink-network-stack.html > [2] https://lists.apache.org/thread/n79tsqd5y474n5th1wdhkrh7bvlts8bs > > Regards, > Alexis. > > Am Do., 8. Dez. 2022 um 15:20 Uhr schrieb Salva Alcántara < > salcantara...@gmail.com>: > >> Just for adding some extra references: >> >> [5] >> https://stackoverflow.com/questions/50536364/apache-flink-coflatmapfunction-does-not-process-events-in-event-time-order >> [6] >> https://stackoverflow.com/questions/61046062/controlling-order-of-processed-elements-within-coprocessfunction-using-custom-so >> [7] >> https://stackoverflow.com/questions/48692658/how-to-concatenate-two-streams-in-apache-flink/48711260#48711260 >> >> Salva >> >> On 2022/12/07 18:52:42 Salva Alcántara wrote: >> > It's well-known that Flink does not provide any guarantees on the order >> in >> > which a CoProcessFunction (or one of its multiple variations) processes >> its >> > two inputs [1]. I wonder then what is the current best >> practice/recommended >> > approach for cases where one needs deterministic results in presence of: >> > >> > 1. A control stream >> > 2. An event/data stream >> > >> > Let's consider event-time semantics; both streams have timestamps, and >> we >> > want to implement "temporal join" semantics where the input events are >> > controlled based on the latest control signals received before them, >> i.e., >> > the ones "active" when the events happened. For simplicity, let's assume >> > that all events are received in order, so that the only source of >> > non-determinism is the one introduced by the CoProcessFunction itself. >> > >> > I'm considering the following options: >> > >> > 1. Buffer events inside the CoProcessFunction for some time, while >> saving >> > all the control signals in state (indexed by time) >> > 2. Same as before but doing the pre-buffering of the event/data streams >> > before the CoProcessFunction >> > 3. Similar as before but considering both streams together by >> multiplexing >> > them into one heterogeneous stream which would be pre-sorted in order to >> > guarantee the global ordering of the events from the two different >> sources. >> > Then, instead of a CoProcessFunction[IN1, IN2, OUT], use a >> > ProcessFunction[Either[IN1, IN2], OUT] which by construction will >> process >> > the data in order and hence produce deterministic results >> > >> > Essentially, all the strategies amount to introducing a "minimum amount >> of >> > delay" to guarantee the deterministic processing, which brings me to the >> > following question: >> > >> > * How to get an estimate for the out-of-order-ness bound that a >> > CoProcessFunction can introduce? Is that even possible in the first >> place? >> > I guess this mostly depends on the sources of the two streams and the >> > relative ratio of records read. For simplicity we can consider a kafka >> > source for both input streams... >> > >> > On a related note, the "temporal join" seems to be a well-documented and >> > solved problem in the SQL API [2-3], but the problem is not even >> mentioned >> > within the docs for the DataStream API. I guess I can copy/adapt the >> > corresponding code. E.g., for the "point-in-time" part I can consider a >> > TreeMap data structure as used in [3]. I have also come across a new >> (still >> > WIP) BinarySortedState [4] which would improve performance w.r.t. the >> > current RocksDB state backend. >> > >> > References: >> > >> > [1] >> > >> https://stackoverflow.com/questions/51628037/facing-race-condition-in-flink-connected-stream-in-apache-flink >> > [2] >> > >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#event-time-temporal-join >> > [3] >> > >> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#introduction-to-watermark-strategies >> > [4] >> > >> https://www.slideshare.net/FlinkForward/introducing-binarysortedmultimap-a-new-flink-state-primitive-to-boost-your-application-performance >> > >> >