Well, for those who might be interested in the semantics I mentioned, I 
implemented a custom operator that seems to achieve what I want by mostly 
ignoring the actual timestamps from the side stream's watermarks. However, it 
kind of depends on the fact that my main stream comes from a previous window 
and is watermarked with "windowEnd - 1" (thus "timestamp1 + 1" below).

public class PrioritizedWatermarkStreamIntervalJoinOperator extends 
IntervalJoinOperator<...> {
    private static final long serialVersionUID = 1L;

    private long maxTimestamp1 = Long.MIN_VALUE;
    private long maxTimestamp2 = Long.MIN_VALUE;

    public PrioritizedWatermarkStreamIntervalJoinOperator(...) {
        super(...);
    }

    @Override
    public void processWatermark1(Watermark mark) throws Exception {
        if (mark.getTimestamp() > maxTimestamp1) {
            maxTimestamp1 = mark.getTimestamp();
        }
        super.processWatermark1(mark);
        maybeProcessWatermark2(mark, mark.getTimestamp(), maxTimestamp2);
    }

    private void maybeProcessWatermark2(Watermark mark, long timestamp1, long 
maxTimestampForComparison) throws Exception {
        if (mark.equals(Watermark.MAX_WATERMARK) && maxTimestampForComparison 
== Watermark.MAX_WATERMARK.getTimestamp()) {
            super.processWatermark2(Watermark.MAX_WATERMARK);
        } else if (maxTimestamp2 > maxTimestamp1) {
            if (timestamp1 == Long.MAX_VALUE) {
                LOG.warn("Trying to bump timestamp1 would result in overflow, 
skipping.");
                return;
            }
            super.processWatermark2(new Watermark(timestamp1 + 1L));
        }
    }

    @Override
    public void processWatermark2(Watermark mark) throws Exception {
        if (mark.getTimestamp() > maxTimestamp2) {
            maxTimestamp2 = mark.getTimestamp();
        }
        maybeProcessWatermark2(mark, maxTimestamp1, maxTimestamp1);
    }
}

Regards,
Alexis.

From: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
Sent: Samstag, 29. Januar 2022 13:47
To: Robert Metzger <metrob...@gmail.com>
Cc: user@flink.apache.org
Subject: RE: Determinism of interval joins

I think I spoke to soon when I said my watermark strategies were like the 
included ones. My generators mark themselves as idle when they start, and stay 
like that as long as they don't seen any event at all. In the tests, I presume 
a variable number of events (and watermarks) from stream1 were consumed before 
anything from stream2 was, so by the time stream2 emitted a watermark to mark 
itself as not idle, it was already too late, and everything was dropped; I 
debugged some of the operators and could see that a lot of inputs were 
considered late since they were processed when the internal watermark service 
already had Long.MAX_VALUE as current watermark. If I change this idleness 
behavior, I do see changes in the test's output.

When running in real-time, I definitely need to mark some streams as idle after 
some time because I don't expect all of them to receive data constantly. 
However, the non-real-time scenario is also relevant for me, and not just for 
testing, if something crashes in the system and suddently the pipeline needs to 
process backlog, it would be nice if semantics were well defined. Ideally, this 
would mean, for two-input operators in general I imagine, that when an operator 
knows that all streams from one input have passed a certain watermark (based on 
slide/tumble time), it would switch and consume from the other stream to check 
whether it's idle or not. I suppose this wouldn't be a definite guarantee 
either since the data from the different streams may take some time to reach 
the different operators (latency and whatnot), but it would still be useful.

I imagine the details are more complex and I'm oversimplifying a bit (I don't 
know how the network stack works), but I would think this kind of semantics are 
commonly expected when handling multiple streams that need joins and so on. 
What do you think?

Regards,
Alexis.

From: Robert Metzger <metrob...@gmail.com<mailto:metrob...@gmail.com>>
Sent: Freitag, 28. Januar 2022 14:49
To: Alexis Sarda-Espinosa 
<alexis.sarda-espin...@microfocus.com<mailto:alexis.sarda-espin...@microfocus.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Determinism of interval joins

Instead of using `reinterpretAsKeyedStream` can you use keyBey and see if the 
behavior gets deterministic?

On Thu, Jan 27, 2022 at 9:49 PM Alexis Sarda-Espinosa 
<alexis.sarda-espin...@microfocus.com<mailto:alexis.sarda-espin...@microfocus.com>>
 wrote:
I'm not sure if the issue in [1] is relevant since it mentions the Table API, 
but it could be. Since stream1 and stream2 in my example have a long chain of 
operators behind, I presume they might "run" at very different paces.

Oh and, in the context of my unit tests, watermarks should be deterministic, 
the input file is sorted, and the watermark strategies should essentially 
behave like the monotonous generator.

[1] https://issues.apache.org/jira/browse/FLINK-24466

Regards,
Alexis.

________________________________
From: Alexis Sarda-Espinosa 
<alexis.sarda-espin...@microfocus.com<mailto:alexis.sarda-espin...@microfocus.com>>
Sent: Thursday, January 27, 2022 1:30 PM
To: user@flink.apache.org<mailto:user@flink.apache.org> 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Determinism of interval joins


Hi everyone,



I'm seeing a lack of determinism in unit tests when using an interval join. I 
am testing with both Flink 1.11.3 and 1.14.3 and the relevant bits of my 
pipeline look a bit like this:



keySelector1 = ...

keySelector2 = ...



rightStream = stream1

  .flatMap(...)

  .keyBy(keySelector1)

  .assignTimestampsAndWatermarks(strategy1)



leftStream = stream2

  .keyBy(keySelector2)

  .assignTimestampsAndWatermarks(strategy2)



joinedStream = DataStreamUtils.reinterpretAsKeyedStream(leftStream, 
keySelector2)

  .intervalJoin(DataStreamUtils.reinterpretAsKeyedStream(rightStream, 
keySelector1))

  .between(Time.minutes(-10L), Time.milliseconds(0L))

  .lowerBoundExclusive()

  .process(new IntervalJoinFunction(...))



---



In my tests, I have a bounded source that loads demo data from a file and 
simulates the stream with a sink that collects results in memory. In the 
specific case of my IntervalJoinFunction, I'm seeing that it's called a 
different amount of times in a non-deterministic way, sometimes I see 14 calls 
to its processElement() method, others 8, others none at all and my output is 
empty; I count this by checking my logs with some tracing.



Does anyone know why this is? Maybe I'm doing something wrong, particularly 
with reinterpretAsKeyedStream.



Regards,

Alexis.


Reply via email to