Just for adding some extra references:

[5]
https://stackoverflow.com/questions/50536364/apache-flink-coflatmapfunction-does-not-process-events-in-event-time-order
[6]
https://stackoverflow.com/questions/61046062/controlling-order-of-processed-elements-within-coprocessfunction-using-custom-so
[7]
https://stackoverflow.com/questions/48692658/how-to-concatenate-two-streams-in-apache-flink/48711260#48711260

Salva

On 2022/12/07 18:52:42 Salva Alcántara wrote:
> It's well-known that Flink does not provide any guarantees on the order in
> which a CoProcessFunction (or one of its multiple variations) processes
its
> two inputs [1]. I wonder then what is the current best
practice/recommended
> approach for cases where one needs deterministic results in presence of:
>
> 1. A control stream
> 2. An event/data stream
>
> Let's consider event-time semantics; both streams have timestamps, and we
> want to implement "temporal join" semantics where the input events are
> controlled based on the latest control signals received before them, i.e.,
> the ones "active" when the events happened. For simplicity, let's assume
> that all events are received in order, so that the only source of
> non-determinism is the one introduced by the CoProcessFunction itself.
>
> I'm considering the following options:
>
> 1. Buffer events inside the CoProcessFunction for some time, while saving
> all the control signals in state (indexed by time)
> 2. Same as before but doing the pre-buffering of the event/data streams
> before the CoProcessFunction
> 3. Similar as before but considering both streams together by multiplexing
> them into one heterogeneous stream which would be pre-sorted in order to
> guarantee the global ordering of the events from the two different
sources.
> Then, instead of a CoProcessFunction[IN1, IN2, OUT], use a
> ProcessFunction[Either[IN1, IN2], OUT] which by construction will process
> the data in order and hence produce deterministic results
>
> Essentially, all the strategies amount to introducing a "minimum amount of
> delay" to guarantee the deterministic processing, which brings me to the
> following question:
>
> * How to get an estimate for the out-of-order-ness bound that a
> CoProcessFunction can introduce? Is that even possible in the first place?
> I guess this mostly depends on the sources of the two streams and the
> relative ratio of records read. For simplicity we can consider a kafka
> source for both input streams...
>
> On a related note, the "temporal join" seems to be a well-documented and
> solved problem in the SQL API [2-3], but the problem is not even mentioned
> within the docs for the DataStream API. I guess I can copy/adapt the
> corresponding code. E.g., for the "point-in-time" part I can consider a
> TreeMap data structure as used in [3]. I have also come across a new
(still
> WIP) BinarySortedState [4] which would improve performance w.r.t. the
> current RocksDB state backend.
>
> References:
>
> [1]
>
https://stackoverflow.com/questions/51628037/facing-race-condition-in-flink-connected-stream-in-apache-flink
> [2]
>
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#event-time-temporal-join
> [3]
>
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#introduction-to-watermark-strategies
> [4]
>
https://www.slideshare.net/FlinkForward/introducing-binarysortedmultimap-a-new-flink-state-primitive-to-boost-your-application-performance
>

Reply via email to