[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105478#comment-17105478 ] Aljoscha Krettek commented on FLINK-5479: - We're currently voting on FLIP-126, which will introduce idleness for all per-partition/per-shard sources: https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners. There is also my branch that has the implementation, with idleness detection as an option: https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased. I'm hoping to still get this into Flink 1.11. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17104286#comment-17104286 ] Niels Basjes commented on FLINK-5479: - [~jaya.ananthram] At this point in time I would recommend creating a very simple separate application that periodically (say every second) inserts a fake/empty event in every kafka partition of your topic with just the current timestamp. From there the normal processing in your application will pick up those events, create the watermarks and then your application only needs to drop those events to avoid polluting the data stream actually being processed by your application. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101035#comment-17101035 ] Jaya Ananthram commented on FLINK-5479: --- Hi All - Is there is any temporary work around regarding this ticket? I see that FLINK-10740 is in progress but not sure when it will available for release, in the mean time if any one able to share any work around for this, that would be great. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17019051#comment-17019051 ] Stephan Ewen commented on FLINK-5479: - This should be fixed with the introduction of the new Source API: * https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface * https://issues.apache.org/jira/browse/FLINK-10740 > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16951861#comment-16951861 ] AT-Fieldless commented on FLINK-5479: - Hi all - Has this bug been fixed? I use 1.6.2 version Flink and my job can't generate correct watermark when only one partition has data. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787991#comment-16787991 ] Padarn Wilson commented on FLINK-5479: -- Hi all - Is this still an open issue that anyone is working on? Was thinking of taking a crack at it, but thought I'd check here first. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.8.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16437175#comment-16437175 ] ASF GitHub Bot commented on FLINK-5479: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5634 @juhoautio Your concerns for this fix is quite correct, and is why this PR was closed in the first place as there are a lot of ill-defined semantics introduced by this. Regarding your thought on relating idleness to empty results returned from Kafka: I think that seems like a good approach, and should also capture Eron's comment quite well. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16437021#comment-16437021 ] ASF GitHub Bot commented on FLINK-5479: --- Github user juhoautio commented on the issue: https://github.com/apache/flink/pull/5634 @tzulitai did you ever test your code? I tried it and it allowed watermarks to proceed but apparently too aggressively, as it caused a lot of data loss. I'm looking for a quick fix for this issue, as it seems that FLINK-5479 won't be fixed too soon. So I would very much like to hear if you have been able to fix this in some lighter way. My understanding of your PR is that it doesn't work reliably because it just seems to add an internal timeout, that could be surpassed whenever the consumer is for example busy consuming other partitions. Please comment if this perception is wrong. I'm thinking that it should instead get the information that a partition was idle from the kafka client, and only in that case (empty result from client) create a newer watermark for that partition. It shouldn't mark the partition to some idle state – and shouldn't create newer watermarks periodically without any connection to another empty result from the client. New watermarks should be only generated as a callback of the kafka client result..? > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16435048#comment-16435048 ] Juho Autio commented on FLINK-5479: --- [~tzulitai]: {quote}Raised this to blocker for 1.5.0, as we are seeing more users with this issue on the mailing lists. {quote} Is a fix for this going to make it to 1.5.0, or has it been postponed to 1.6.0? This is a big problem for us in a job where we need to consume many topics with a catch-all topic name pattern, and many topics/partitions can remain idle for a long time – even forever. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391304#comment-16391304 ] ASF GitHub Bot commented on FLINK-5479: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5634 Just saw a good comment from @EronWright > I think the ideal would be that idleness would occur only for tail reads, i.e. due to a timeout from `kafkaConsumer.poll(pollTimeout)`. In other words, an intermittent connection issue would ideally not trigger idleness. Let's see if we can get that into the design somehow, without having too specific logic inside the Kafka Consumer (making the Kafka Consumer more complex is my personal Pet Peeve) > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390362#comment-16390362 ] ASF GitHub Bot commented on FLINK-5479: --- Github user tweise commented on the issue: https://github.com/apache/flink/pull/5634 This is a good proposal, it should also survive a general connector refactor that will be necessary to address other code duplication. The Kinesis ticket is https://issues.apache.org/jira/browse/FLINK-5697 and I will add a reference back to this thread. I would be happy to add the watermark support based on the revised generator. Perhaps it would be good to recognize the special "idle" watermark in SourceContext also? > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16390245#comment-16390245 ] ASF GitHub Bot commented on FLINK-5479: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5634 @tweise @tzulitai I would suggest to solve this the following way, which should be both simple and cover our cases: - We extend the current periodic watermark generators for idleness. We can do that for example by maintaining a record counter and remembering the last counter and a System.nanoTime() timestamp each time the call whether to generate a watermark is called. If no record came for too long, return a special watermark object that indicated "idle". Or change the return type to return either 'none', 'idle', or 'watermark' - The Kinesis Concumer needs per-shard watermarks, same way as the Kafka Consumer does. That part needs to be added to the Kinesis consumer anyways. That way, we automatically get per-shard idleness in Kinesis and per-partition idleness in Kafka without doing anything specific for the source connectors. We can then also remove the idleness logic from the source context - it would be duplicate there. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389940#comment-16389940 ] ASF GitHub Bot commented on FLINK-5479: --- Github user tweise commented on the issue: https://github.com/apache/flink/pull/5634 @tzulitai @StephanEwen the current idleness detection in the source context isn't a replacement for what is required to deal with an inactive partition (or Kinesis shard). When a connector subtask consumes multiple partitions and one is idle, then it should be possible to still generate a watermark. This can only be solved outside of the connector when the multiple source partitions are visible (like it would be for an operator with multiple input streams). > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389426#comment-16389426 ] Tzu-Li (Gordon) Tai commented on FLINK-5479: After some discussion with [~aljoscha] offline, we decided to not touch the Kafka connector code any more for Flink 1.5. Let's move this to 1.6.0, where we are planning a new implementation for the Kafka / Kinesis connectors. This feature can be implemented commonly for these connectors. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.6.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389422#comment-16389422 ] ASF GitHub Bot commented on FLINK-5479: --- Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/5634 > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389421#comment-16389421 ] ASF GitHub Bot commented on FLINK-5479: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5634 I'm closing this PR now, as it seems the overall agreement is that we want to implement this differently, or at least not touch the Kafka connector code any more now. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16389051#comment-16389051 ] ASF GitHub Bot commented on FLINK-5479: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5634 A few points to mention: 1. @StephanEwen there is already common idleness detection implemented within `SourceContext`s, see `StreamSourceContexts`. The idleness detection, however, is currently always disabled and we do not allow users to configure it. 2. I agree that we shouldn't add anything more to the connector, have also discussed this offline with @aljoscha. We should maybe do this only as part of the new connector rework that @tweise and I were talking about, in 1.6. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16388123#comment-16388123 ] ASF GitHub Bot commented on FLINK-5479: --- Github user tweise commented on the issue: https://github.com/apache/flink/pull/5634 There was a related discussion on the mailing list; this and several other features could be provided by a common connector framework. Such initiative is a much larger effort though and it is not clear to me that users can wait? The Kinesis consumer has virtually identical requirements and we have already written custom code for it. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16387700#comment-16387700 ] ASF GitHub Bot commented on FLINK-5479: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5634 I would suggest to approach this in a different way. 1. Idleness detection is something that watermark generation benefits from in general, not just in Kafka 2. Unless there is a very strong reason, I would not want to add anything anymore to the Kafka Connector. This connector implementation is so big already. We saw multiple issues in the past, where the Kafka Connector's complexity was the cause of problems. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386676#comment-16386676 ] ASF GitHub Bot commented on FLINK-5479: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5634 I think the ideal would be that idleness would occur only for tail reads, i.e. due to a timeout from `kafkaConsumer.poll(pollTimeout)`.In other words, an intermittent connection issue would ideally not trigger idleness. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16385863#comment-16385863 ] ASF GitHub Bot commented on FLINK-5479: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5634 [FLINK-5479] [kafka] Idleness detection for periodic per-partition watermarks in FlinkKafkaConsumer ## What is the purpose of the change This commit adds the capability to detect idle partitions in the `FlinkKafkaConsumer` when using periodic per-partition watermark generation. Users set the partition idle timeout using `setPartitionIdleTimeout(long)`. The value of the timeout determines how long a an idle partition may block watermark advancement downstream. ## Brief change log - Adds a `setPartitionIdleTimeout(long)` configuration method - Modifies `KafkaTopicPartitionStateWithPeriodicWatermarks` to keep track of necessary information to determine partition idleness. - Adds idleness detection logic to `AsbtractFetcher.PeriodicWatermarkEmitter`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-5479 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5634.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5634 commit d7b95ca1c8bb85f77dd6b83becf8fc1d5cccb810 Author: Tzu-Li (Gordon) Tai Date: 2018-03-05T09:35:02Z [FLINK-5479] [kafka] Idleness detection for periodic per-partition watermarks This commit adds the capability to detect idle partitions in the FlinkKafkaConsumer when using periodic per-partition watermark generation. Users set the partition idle timeout using `setPartitionIdleTimeout(long)`. The value of the timeout determines how long a an idle partition may block watermark advancement downstream. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16343115#comment-16343115 ] yanxiaobin commented on FLINK-5479: --- I've also met this problem at the moment. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335116#comment-16335116 ] Eron Wright commented on FLINK-5479: - To elaborate on my earlier comment about `max.message.time.difference.ms`, let's consider the ideal watermark for the two types of timestamps supported by Kafka (as per KIP-32), CreateTime and LogAppendTime. In LogAppendTime, the timestamp is monotonically increasing with each message, and corresponds to the wall clock time of the broker at append time. The per-partition watermark could simply track the message time. The complication is how to advance the watermark when the partition is idle; an in-band heartbeat from the broker (informing the client about the progression of its wall clock) would be ideal. In CreateTime, the timestamp is supplied by the producer, but the broker may enforce an upper bound ("max difference") on the delta between the message timestamp and the broker's current time. The ideal per-partition watermark would be the broker's current time minus the max difference. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16334671#comment-16334671 ] Tzu-Li (Gordon) Tai commented on FLINK-5479: [~eronwright]regarding your suggestion for using {{max.message.time.difference.ms}} as basis - we'll need to think about how this interplays with the configuration we're planning to expose with FLINK-5018. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16288928#comment-16288928 ] Tzu-Li (Gordon) Tai commented on FLINK-5479: Raised this to blocker for 1.5.0, as we are seeing more users with this issue on the mailing lists. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16141068#comment-16141068 ] Eron Wright commented on FLINK-5479: - Thinking out loud here, consider making use of Kafka's `max.message.time.difference.ms` as the basis for idle timeout. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.4.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)