Hi Alexis,

I tried looking into your example. First of all, so far, I've spent only a limited time looking at the WatermarkGenerator, and I have not thoroughly understood how it works. I'd discourage assigning watermarks anywhere in the middle of your pipeline. This is considered to be an anti pattern and at some point there were thoughts to remove such a possibility.


Having said that, indeed there is a bug in the TimestampsAndWatermarksOperator[1]. The issue is that WatermarkGenerators in the middle of a pipeline cut off upstream watermarks, however they do not cut off WatermarkStatuses. Therefore when you have a chain of generators G1, G2, G3 each of parallelism 2. It is also true that only a single operator receives records (because of the constant key) and therefore only a single subtask is intended to generate watermarks. Therefore what happens is that the generator G3 starts with one subtask IDLE and one ACTIVE, but it receives ACTIVE status from the upstream generator G2, because of all the cross connections (keyBy) between operators. Therefore it marks both channels ACTIVE, but only a single one generates watermarks.


As a recommendation I'd suggest keeping the watermark generation just right after the source. If this is not possible, as a workaround before it is fixed in Flink, you need to cut off WatermarkStatuses somehow. You can do that either in a custom operator or by modifying the TimestampsAndWatermarksOperator.


Best,

Dawid


[1] https://issues.apache.org/jira/browse/FLINK-26708



On 15/03/2022 23:47, Alexis Sarda-Espinosa wrote:
For completeness, this still happens with Flink 1.14.4

Regards,
Alexis.

------------------------------------------------------------------------
*From:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
*Sent:* Friday, March 11, 2022 12:21 AM
*To:* user@flink.apache.org <user@flink.apache.org>
*Cc:* pnowoj...@apache.org <pnowoj...@apache.org>
*Subject:* Re: Interval join operator is not forwarding watermarks correctly I think I managed to create a reproducible example [1], I think it's due to the use of window + join + window. When I run the test, I never see the print output, but if I uncomment part of the code in the watermark generator to mark it as idle more quickly, it starts working after a while.

[1] https://github.com/asardaes/flink-interval-join-test


Regards,
Alexis.

------------------------------------------------------------------------
*From:* Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
*Sent:* Thursday, March 10, 2022 7:47 PM
*To:* user@flink.apache.org <user@flink.apache.org>
*Cc:* pnowoj...@apache.org <pnowoj...@apache.org>
*Subject:* RE: Interval join operator is not forwarding watermarks correctly

I found [1] and [2], which are closed, but could be related?

[1] https://issues.apache.org/jira/browse/FLINK-23698

[2] https://issues.apache.org/jira/browse/FLINK-18934

Regards,

Alexis.

*From:*Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
*Sent:* Donnerstag, 10. März 2022 19:27
*To:* user@flink.apache.org
*Subject:* Interval join operator is not forwarding watermarks correctly

Hello,

I’m in the process of updating from Flink 1.11.3 to 1.14.3, and it seems the interval join in my pipeline is no longer working. More specifically, I have a sliding window after the interval join, and the window isn’t firing. After many tests, I ended up creating a custom operator that extends IntervalJoinOperator and I overrode processWatermark1() and processWatermark2() to add logs and check when they are called. I can see that processWatermark1() isn’t called.

For completeness, this is how I use my custom operator:

joinOperator = new CustomIntervalJoinOperator(…);

stream1.connect(stream2)

.keyBy(selector1, selector2)

.transform("Interval Join", TypeInformation.of(Pojo.class), joinOperator);

---

Some more information in case it’s relevant:

- stream2 is obtained from a side output.

- both stream1 and stream2 have watermarks assigned by custom strategies. I also log watermark creation, and I can see that watermarks are indeed emitted as expected in both streams.

Strangely, my watermark strategies mark themselves idle if they don’t receive new events after 10 minutes, and if I send some events and wait 10 minutes, processWatermark1() is called! On the other hand, if I continuously send events, it is never called.

Is this a known issue?

Regards,

Alexis.

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to