Hi Alexis,

Thanks a lot for your reply & guidance, which makes a lot of sense to me
overall.

Regards,

Salva

On Thu, Dec 8, 2022 at 5:34 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi Salva,
>
> Just to give you further thoughts from another user, I think the "temporal
> join" semantics are very critical in this use case, and what you implement
> for that may not easily generalize to other cases. Because of that, I'm not
> sure if you can really define best practices that apply in general.
> Additionally, you also have to take idleness into account, given that using
> event-time could leave you in a "frozen" state if you're not receiving
> events continuously.
>
> I also doubt you can accurately estimate out-of-orderness in this scenario
> due to the particularities of Flink's network stack [1]. Even if you only
> have 2 sources and immediately connect them together, parallelism and the
> resulting shuffles can change from one execution to the other even if you
> don't change the logic at all, because scheduling is also non-deterministic
> and the "distribution" of events across different parallel instances of
> your sources could vary a lot as well.
>
> I think that others will tell you that you indeed need to find a way to
> buffer events for a while, I got the same advice in the past. Focusing very
> specifically on what you described (streams for data & control events +
> event-time + temporal join), but maybe also assuming you can manage
> watermarks in a way that handles idleness without simply freezing the
> stream, I once tried a custom operator (not function) to force the
> watermarks of 1 stream to wait for the other one -
> see PrioritizedWatermarkStreamIntervalJoinOperator in [2]. This still uses
> the idea of buffering, it just moves that responsibility to the operator
> that's already handling "window data" for the join. Also, it extends an
> internal class, so it's very much unofficial, and it's probably too
> specific to my use case, but maybe it gives you other ideas to consider.
>
> And one last detail to further exemplify complexity here: when I was
> testing my custom operator with event-time simulations in my IDE, I
> initially didn't think about the fact that a watermark with Long.MAX_VALUE
> is sent at the end of the simulation, which was another source of
> non-determinism because sometimes the control stream was processed entirely
> (including the max watermark) before a single event from the data stream
> went through, which meant that all events from the data stream were
> considered late arrivals and silently dropped.
>
> [1] https://flink.apache.org/2019/06/05/flink-network-stack.html
> [2] https://lists.apache.org/thread/n79tsqd5y474n5th1wdhkrh7bvlts8bs
>
> Regards,
> Alexis.
>
> Am Do., 8. Dez. 2022 um 15:20 Uhr schrieb Salva Alcántara <
> salcantara...@gmail.com>:
>
>> Just for adding some extra references:
>>
>> [5]
>> https://stackoverflow.com/questions/50536364/apache-flink-coflatmapfunction-does-not-process-events-in-event-time-order
>> [6]
>> https://stackoverflow.com/questions/61046062/controlling-order-of-processed-elements-within-coprocessfunction-using-custom-so
>> [7]
>> https://stackoverflow.com/questions/48692658/how-to-concatenate-two-streams-in-apache-flink/48711260#48711260
>>
>> Salva
>>
>> On 2022/12/07 18:52:42 Salva Alcántara wrote:
>> > It's well-known that Flink does not provide any guarantees on the order
>> in
>> > which a CoProcessFunction (or one of its multiple variations) processes
>> its
>> > two inputs [1]. I wonder then what is the current best
>> practice/recommended
>> > approach for cases where one needs deterministic results in presence of:
>> >
>> > 1. A control stream
>> > 2. An event/data stream
>> >
>> > Let's consider event-time semantics; both streams have timestamps, and
>> we
>> > want to implement "temporal join" semantics where the input events are
>> > controlled based on the latest control signals received before them,
>> i.e.,
>> > the ones "active" when the events happened. For simplicity, let's assume
>> > that all events are received in order, so that the only source of
>> > non-determinism is the one introduced by the CoProcessFunction itself.
>> >
>> > I'm considering the following options:
>> >
>> > 1. Buffer events inside the CoProcessFunction for some time, while
>> saving
>> > all the control signals in state (indexed by time)
>> > 2. Same as before but doing the pre-buffering of the event/data streams
>> > before the CoProcessFunction
>> > 3. Similar as before but considering both streams together by
>> multiplexing
>> > them into one heterogeneous stream which would be pre-sorted in order to
>> > guarantee the global ordering of the events from the two different
>> sources.
>> > Then, instead of a CoProcessFunction[IN1, IN2, OUT], use a
>> > ProcessFunction[Either[IN1, IN2], OUT] which by construction will
>> process
>> > the data in order and hence produce deterministic results
>> >
>> > Essentially, all the strategies amount to introducing a "minimum amount
>> of
>> > delay" to guarantee the deterministic processing, which brings me to the
>> > following question:
>> >
>> > * How to get an estimate for the out-of-order-ness bound that a
>> > CoProcessFunction can introduce? Is that even possible in the first
>> place?
>> > I guess this mostly depends on the sources of the two streams and the
>> > relative ratio of records read. For simplicity we can consider a kafka
>> > source for both input streams...
>> >
>> > On a related note, the "temporal join" seems to be a well-documented and
>> > solved problem in the SQL API [2-3], but the problem is not even
>> mentioned
>> > within the docs for the DataStream API. I guess I can copy/adapt the
>> > corresponding code. E.g., for the "point-in-time" part I can consider a
>> > TreeMap data structure as used in [3]. I have also come across a new
>> (still
>> > WIP) BinarySortedState [4] which would improve performance w.r.t. the
>> > current RocksDB state backend.
>> >
>> > References:
>> >
>> > [1]
>> >
>> https://stackoverflow.com/questions/51628037/facing-race-condition-in-flink-connected-stream-in-apache-flink
>> > [2]
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#event-time-temporal-join
>> > [3]
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#introduction-to-watermark-strategies
>> > [4]
>> >
>> https://www.slideshare.net/FlinkForward/introducing-binarysortedmultimap-a-new-flink-state-primitive-to-boost-your-application-performance
>> >
>>
>

Reply via email to