[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17336865#comment-17336865 ] Flink Jira Bot commented on FLINK-4015: --- This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke >Priority: Major > Labels: stale-major > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17328920#comment-17328920 ] Flink Jira Bot commented on FLINK-4015: --- This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke >Priority: Major > Labels: stale-major > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372665#comment-15372665 ] Sebastian Klemke commented on FLINK-4015: - No, synchronous wouldn't be fast enough: We have ~11k records produced per second on each sink node, if this number drops significantly, our replay procedure will take much longer. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372606#comment-15372606 ] Robert Metzger commented on FLINK-4015: --- What's the throughput of the Kafka producer? Do you think the synchronous producer would be fast enough? I'm asking because the synchronous variant is probably easier to implement and also to operate. With the buffering Kafka producer, you need to maintain all the unconfirmed records in memory, so you may run into garbage collection issues. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371216#comment-15371216 ] Sebastian Klemke commented on FLINK-4015: - For our use case, buffering the in-flight records and replaying the whole buffer in order from first failed message would be optimal. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370450#comment-15370450 ] Stephan Ewen commented on FLINK-4015: - [~packet] There are two ways to achieve this: 1. Having a synchronous producer that sends one record at a time (no batching) 2. Buffering a certain amount of records in the FlinkKafkaProducer, and re-sending them all upon a failure. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15329371#comment-15329371 ] Sebastian Klemke commented on FLINK-4015: - For our application, order is more important than duplicates: We can cope with duplicates but not with out-of-order records. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15329250#comment-15329250 ] Robert Metzger commented on FLINK-4015: --- I don't think we can reliably guarantee that there are no duplicate or out of order messages in a Kafka topic, due to the limitations of Kafka. The Kafka community is working on fixing it (https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish), but until then, we have to life with the limitations. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15318949#comment-15318949 ] Sebastian Klemke commented on FLINK-4015: - For our application, it should never drop records. Instead, it should be determined which record failed and this record and the following records produced to that partition should be repeated in order. This would require keeping a list of sent but not acknowledged records per partition in the producer. But for other applications, dropping might be suitable. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15318920#comment-15318920 ] Stephan Ewen commented on FLINK-4015: - What kind of behavior should the sink have? If a retry fails, should it simply drop the record? > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15314042#comment-15314042 ] Sebastian Klemke commented on FLINK-4015: - We use default retries: 0. Also, we can't set retries to a higher value, bc kafka documentation says "Allowing retries will potentially change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second succeeds, then the second record may appear first." For our application, this must not happen. Repeating whole batches in-order would be okay. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes
[ https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15314026#comment-15314026 ] Robert Metzger commented on FLINK-4015: --- Hi Sebastian, do you have set the number of retires set to a value higher than 0 ? By default, its set to 0 so the producer will not retry in cause such an error happens. I would recommend to allow more retries. The reason why we don't set the number to something higher by default is, that retries can cause duplicate records in kafka. > FlinkKafkaProducer08 fails when partition leader changes > > > Key: FLINK-4015 > URL: https://issues.apache.org/jira/browse/FLINK-4015 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.2 >Reporter: Sebastian Klemke > > When leader for a partition changes, producer fails with the following > exception: > {code} > 06:34:50,813 INFO org.apache.flink.yarn.YarnJobManager >- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to > FAILING. > java.lang.RuntimeException: Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at OPERATOR.flatMap2(OPERATOR.java:82) > at OPERATOR.flatMap2(OPERATOR.java:16) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 10 more > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 13 more > Caused by: java.lang.Exception: Failed to send data to Kafka: This server is > not the leader for that topic-partition. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > ... 16 more > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)