Just for adding some extra references: [5] https://stackoverflow.com/questions/50536364/apache-flink-coflatmapfunction-does-not-process-events-in-event-time-order [6] https://stackoverflow.com/questions/61046062/controlling-order-of-processed-elements-within-coprocessfunction-using-custom-so [7] https://stackoverflow.com/questions/48692658/how-to-concatenate-two-streams-in-apache-flink/48711260#48711260
Salva On 2022/12/07 18:52:42 Salva Alcántara wrote: > It's well-known that Flink does not provide any guarantees on the order in > which a CoProcessFunction (or one of its multiple variations) processes its > two inputs [1]. I wonder then what is the current best practice/recommended > approach for cases where one needs deterministic results in presence of: > > 1. A control stream > 2. An event/data stream > > Let's consider event-time semantics; both streams have timestamps, and we > want to implement "temporal join" semantics where the input events are > controlled based on the latest control signals received before them, i.e., > the ones "active" when the events happened. For simplicity, let's assume > that all events are received in order, so that the only source of > non-determinism is the one introduced by the CoProcessFunction itself. > > I'm considering the following options: > > 1. Buffer events inside the CoProcessFunction for some time, while saving > all the control signals in state (indexed by time) > 2. Same as before but doing the pre-buffering of the event/data streams > before the CoProcessFunction > 3. Similar as before but considering both streams together by multiplexing > them into one heterogeneous stream which would be pre-sorted in order to > guarantee the global ordering of the events from the two different sources. > Then, instead of a CoProcessFunction[IN1, IN2, OUT], use a > ProcessFunction[Either[IN1, IN2], OUT] which by construction will process > the data in order and hence produce deterministic results > > Essentially, all the strategies amount to introducing a "minimum amount of > delay" to guarantee the deterministic processing, which brings me to the > following question: > > * How to get an estimate for the out-of-order-ness bound that a > CoProcessFunction can introduce? Is that even possible in the first place? > I guess this mostly depends on the sources of the two streams and the > relative ratio of records read. For simplicity we can consider a kafka > source for both input streams... > > On a related note, the "temporal join" seems to be a well-documented and > solved problem in the SQL API [2-3], but the problem is not even mentioned > within the docs for the DataStream API. I guess I can copy/adapt the > corresponding code. E.g., for the "point-in-time" part I can consider a > TreeMap data structure as used in [3]. I have also come across a new (still > WIP) BinarySortedState [4] which would improve performance w.r.t. the > current RocksDB state backend. > > References: > > [1] > https://stackoverflow.com/questions/51628037/facing-race-condition-in-flink-connected-stream-in-apache-flink > [2] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#event-time-temporal-join > [3] > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#introduction-to-watermark-strategies > [4] > https://www.slideshare.net/FlinkForward/introducing-binarysortedmultimap-a-new-flink-state-primitive-to-boost-your-application-performance >