Hi Juho,

the partitions of both topics are independently consumed, i.e., at their
own speed without coordination. With the configuration that Gordon linked,
watermarks are generated per partition.
Each source task maintains the latest (and highest) watermark per partition
and propagates the smallest watermark. The same mechanism is applied for
watermarks across tasks (this is what Kien referred to).

In the case that you are describing, the partitions of the smaller topic
are faster consumed (hence the offsets are faster aligned) but watermarks
are emitted "at the speed" of the bigger topic.
Therefore, the timestamps of records from the smaller topic can be much
ahead of the watermark.
In principle, that does not pose a problem. Stateful operators (such as
windows) remember the "early" records and process them when they receive a
watermark passes the timestamps of the early records.

Regarding your question "Are they committed to Kafka before their watermark
has passed on Flink's side?":
The offsets of the smaller topic might be checkpointed when all partitions
have been read to the "end" and the bigger topic is still catching up.
The watermarks are moving at the speed of the bigger topic, but all "early"
events of the smaller topic are stored in stateful operators and are
checkpointed as well.

So, you do not lose neither early nor late data.

Best, Fabian



2017-12-01 13:43 GMT+01:00 Juho Autio <juho.au...@rovio.com>:

> Thanks for the answers, I still don't understand why I can see the offsets
> being quickly committed to Kafka for the "small topic"? Are they committed
> to Kafka before their watermark has passed on Flink's side? That would be
> quite confusing.. Indeed when Flink handles the state/offsets internally,
> the consumer offsets are committed to Kafka just for reference.
>
> Otherwise, what you're saying sounds very good to me. The documentation
> just doesn't explicitly say anything about how it works across topics.
>
> On Kien's answer: "When you join multiple stream with different
> watermarks", note that I'm not joining any topics myself, I get them as a
> single stream from the Flink kafka consumer based on the list of topics
> that I asked for.
>
> Thanks,
> Juho
>
> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi!
>>
>> The FlinkKafkaConsumer can handle watermark advancement with
>> per-Kafka-partition awareness (across partitions of different topics).
>> You can see an example of how to do that here [1].
>>
>> Basically what this does is that it generates watermarks within the Kafka
>> consumer individually for each Kafka partition, and the per-partition
>> watermarks are aggregated and emitted from the consumer in the same way
>> that
>> watermarks are aggregated on a stream shuffle; only when the low watermark
>> advances across all partitions, should a watermark be emitted from the
>> consumer.
>>
>> Therefore, this helps avoid the problem that you described, in which a
>> "big_topic" has subscribed partitions that lags behind others. In this
>> case
>> and when the above feature is used, the event time would advance along
>> with
>> the lagging "big_topic" partitions and would not result in messages being
>> recognized as late and discarded.
>>
>> Cheers,
>> Gordon
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>

Reply via email to