[jira] [Commented] (FLINK-6264) Kafka consumer fails if can't find leader for partition
[ https://issues.apache.org/jira/browse/FLINK-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976253#comment-15976253 ] Gyula Fora commented on FLINK-6264: --- I am not sure what causes the exception, probably [~tzulitai] is right. In any case I think we should recover from this problem without crashing the job because this seems to be something occuring relatively frequently. This probably means we have to retry fetching the metadata (for the affected partitions or all) a couple of times with some backoff maybe to give some time for Kafka to recover as well (if that causes the problem.) In many cases we have noticed that after a problematic leader change/broker death it takes some time (seconds or minutes) until Kafka goes back in a state that will operate normally, we should try to bridge these gaps without crashing because that's much worse. > Kafka consumer fails if can't find leader for partition > --- > > Key: FLINK-6264 > URL: https://issues.apache.org/jira/browse/FLINK-6264 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Gyula Fora > > We have observed the following error many times when brokers failed/were > restarted: > java.lang.RuntimeException: Unable to find a leader for partitions: > [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, > KafkaPartitionHandle=[mytopic,10], offset=-1] > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6264) Kafka consumer fails if can't find leader for partition
[ https://issues.apache.org/jira/browse/FLINK-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976105#comment-15976105 ] Tzu-Li (Gordon) Tai commented on FLINK-6264: [~autoaim800] I think the exception occurs when there are some partitions whose leader cannot be found, so it isn't the case that you described. I think what [~gyfora] is suggesting is that those partitions should be retried instead of failing? Gyula, could you clarify? > Kafka consumer fails if can't find leader for partition > --- > > Key: FLINK-6264 > URL: https://issues.apache.org/jira/browse/FLINK-6264 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Gyula Fora > > We have observed the following error many times when brokers failed/were > restarted: > java.lang.RuntimeException: Unable to find a leader for partitions: > [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, > KafkaPartitionHandle=[mytopic,10], offset=-1] > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6264) Kafka consumer fails if can't find leader for partition
[ https://issues.apache.org/jira/browse/FLINK-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15976012#comment-15976012 ] mingleizhang commented on FLINK-6264: - Hi, I would not think it is a bug when brokers failer and restarted. That is because {code}unassignedPartitions.size() > 0{code} and then you got this exception. [~tzulitai] Hi, How about your opinion about this issue ? > Kafka consumer fails if can't find leader for partition > --- > > Key: FLINK-6264 > URL: https://issues.apache.org/jira/browse/FLINK-6264 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.2.0 >Reporter: Gyula Fora > > We have observed the following error many times when brokers failed/were > restarted: > java.lang.RuntimeException: Unable to find a leader for partitions: > [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, > KafkaPartitionHandle=[mytopic,10], offset=-1] > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474) > at > org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)