Hi,

When you join multiple stream with different watermarks,

the resulting stream's watermark will be the smallest of the input watermark,

as long as you don't explicitly assign a new watermarks generator.


In your example, if small_topic has watermark at time t1, big_topic has watermark at time t2,

with t1 > t2 due to small_topic being consumed faster. If you join the two stream into a single big_and_small_stream,

then big_and_small_stream will have watermark at time t2, so no message from big_topic will be lost due to lateness.


Regards,

Kien


On 11/22/2017 4:36 PM, Juho Autio wrote:
I would like to understand how FlinkKafkaConsumer treats "unbalanced" topics.

We're using FlinkKafkaConsumer010 with 2 topics, say "small_topic" & "big_topic".

After restoring from an old savepoint (4 hours before), I checked the consumer offsets on Kafka (Flink commits offsets to kafka for reference – checkpointing is enabled so in reality Flink manages the offsets internally). It seems that the offsets for "small_topic" are quickly caught up and committed to the latest current offset. However, catching up with the "big_topic" takes much longer.

Is it possible that watermarks are determined based on the "small_topic" and messages that are read from the "big_topic" are getting discarded (ie. excluded from the triggered windows) if they have too old event time timestamps?

Or how does FlinkKafkaConsumer handle this? Does it somehow synchronize reading based on the extracted timestamp:
- across partitions of a single topic?
- across topics?

Our code is basically:

        env
                .addSource(new FlinkKafkaConsumer010<>(
                        Arrays.asList("big_topic", "small_topic"),
                        new EventMapSchema(),
                        props))
                .assignTimestampsAndWatermarks(
                        new OufOfOrderTimestampExtractor("timestamp_field",
Time.seconds(cfg.getMaxOutOfOrdernessInSeconds())));

Finally, I don't think this problem is only related to restoring state and having to catch up. It just makes it more prominent. This could also happen during normal streaming, if consuming from bigger topics is slow enough?

Thanks!

Reply via email to