[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes

2021-04-29 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17336865#comment-17336865
 ] 

Flink Jira Bot commented on FLINK-4015:
---

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> FlinkKafkaProducer08 fails when partition leader changes
> 
>
> Key: FLINK-4015
> URL: https://issues.apache.org/jira/browse/FLINK-4015
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.0.2
>Reporter: Sebastian Klemke
>Priority: Major
>  Labels: stale-major
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager   
>- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at OPERATOR.flatMap2(OPERATOR.java:82)
>   at OPERATOR.flatMap2(OPERATOR.java:16)
>   at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes

2021-04-22 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17328920#comment-17328920
 ] 

Flink Jira Bot commented on FLINK-4015:
---

This major issue is unassigned and itself and all of its Sub-Tasks have not 
been updated for 30 days. So, it has been labeled "stale-major". If this ticket 
is indeed "major", please either assign yourself or give an update. Afterwards, 
please remove the label. In 7 days the issue will be deprioritized.

> FlinkKafkaProducer08 fails when partition leader changes
> 
>
> Key: FLINK-4015
> URL: https://issues.apache.org/jira/browse/FLINK-4015
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.0.2
>Reporter: Sebastian Klemke
>Priority: Major
>  Labels: stale-major
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager   
>- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at OPERATOR.flatMap2(OPERATOR.java:82)
>   at OPERATOR.flatMap2(OPERATOR.java:16)
>   at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes

2016-07-12 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372665#comment-15372665
 ] 

Sebastian Klemke commented on FLINK-4015:
-

No, synchronous wouldn't be fast enough: We have ~11k records produced per 
second on each sink node, if this number drops significantly, our replay 
procedure will take much longer.

> FlinkKafkaProducer08 fails when partition leader changes
> 
>
> Key: FLINK-4015
> URL: https://issues.apache.org/jira/browse/FLINK-4015
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.2
>Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager   
>- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at OPERATOR.flatMap2(OPERATOR.java:82)
>   at OPERATOR.flatMap2(OPERATOR.java:16)
>   at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes

2016-07-12 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372606#comment-15372606
 ] 

Robert Metzger commented on FLINK-4015:
---

What's the throughput of the Kafka producer? Do you think the synchronous 
producer would be fast enough?
I'm asking because the synchronous variant is probably easier to implement and 
also to operate. With the buffering Kafka producer, you need to maintain all 
the unconfirmed records in memory, so you may run into garbage collection 
issues.

> FlinkKafkaProducer08 fails when partition leader changes
> 
>
> Key: FLINK-4015
> URL: https://issues.apache.org/jira/browse/FLINK-4015
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.2
>Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager   
>- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at OPERATOR.flatMap2(OPERATOR.java:82)
>   at OPERATOR.flatMap2(OPERATOR.java:16)
>   at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes

2016-07-11 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15371216#comment-15371216
 ] 

Sebastian Klemke commented on FLINK-4015:
-

For our use case, buffering the in-flight records and replaying the whole 
buffer in order from first failed message would be optimal.

> FlinkKafkaProducer08 fails when partition leader changes
> 
>
> Key: FLINK-4015
> URL: https://issues.apache.org/jira/browse/FLINK-4015
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.2
>Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager   
>- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at OPERATOR.flatMap2(OPERATOR.java:82)
>   at OPERATOR.flatMap2(OPERATOR.java:16)
>   at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes

2016-07-11 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370450#comment-15370450
 ] 

Stephan Ewen commented on FLINK-4015:
-

[~packet] There are two ways to achieve this:

1. Having a synchronous producer that sends one record at a time (no batching)

2. Buffering a certain amount of records in the FlinkKafkaProducer, and 
re-sending them all upon a failure.

> FlinkKafkaProducer08 fails when partition leader changes
> 
>
> Key: FLINK-4015
> URL: https://issues.apache.org/jira/browse/FLINK-4015
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.2
>Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager   
>- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at OPERATOR.flatMap2(OPERATOR.java:82)
>   at OPERATOR.flatMap2(OPERATOR.java:16)
>   at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes

2016-06-14 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15329371#comment-15329371
 ] 

Sebastian Klemke commented on FLINK-4015:
-

For our application, order is more important than duplicates: We can cope with 
duplicates but not with out-of-order records.

> FlinkKafkaProducer08 fails when partition leader changes
> 
>
> Key: FLINK-4015
> URL: https://issues.apache.org/jira/browse/FLINK-4015
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.2
>Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager   
>- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at OPERATOR.flatMap2(OPERATOR.java:82)
>   at OPERATOR.flatMap2(OPERATOR.java:16)
>   at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes

2016-06-14 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15329250#comment-15329250
 ] 

Robert Metzger commented on FLINK-4015:
---

I don't think we can reliably guarantee that there are no duplicate or out of 
order messages in a Kafka topic, due to the limitations of Kafka. The Kafka 
community is working on fixing it 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-27+-+Conditional+Publish),
 but until then, we have to life with the limitations.


> FlinkKafkaProducer08 fails when partition leader changes
> 
>
> Key: FLINK-4015
> URL: https://issues.apache.org/jira/browse/FLINK-4015
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.2
>Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager   
>- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at OPERATOR.flatMap2(OPERATOR.java:82)
>   at OPERATOR.flatMap2(OPERATOR.java:16)
>   at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes

2016-06-07 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15318949#comment-15318949
 ] 

Sebastian Klemke commented on FLINK-4015:
-

For our application, it should never drop records. Instead, it should be 
determined which record failed and this record and the following records 
produced to that partition should be repeated in order. This would require 
keeping a list of sent but not acknowledged records per partition in the 
producer. But for other applications, dropping might be suitable.

> FlinkKafkaProducer08 fails when partition leader changes
> 
>
> Key: FLINK-4015
> URL: https://issues.apache.org/jira/browse/FLINK-4015
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.2
>Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager   
>- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at OPERATOR.flatMap2(OPERATOR.java:82)
>   at OPERATOR.flatMap2(OPERATOR.java:16)
>   at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes

2016-06-07 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15318920#comment-15318920
 ] 

Stephan Ewen commented on FLINK-4015:
-

What kind of behavior should the sink have?
If a retry fails, should it simply drop the record?

> FlinkKafkaProducer08 fails when partition leader changes
> 
>
> Key: FLINK-4015
> URL: https://issues.apache.org/jira/browse/FLINK-4015
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.2
>Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager   
>- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at OPERATOR.flatMap2(OPERATOR.java:82)
>   at OPERATOR.flatMap2(OPERATOR.java:16)
>   at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes

2016-06-03 Thread Sebastian Klemke (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15314042#comment-15314042
 ] 

Sebastian Klemke commented on FLINK-4015:
-

We use default retries: 0. Also, we can't set retries to a higher value, bc 
kafka documentation says "Allowing retries will potentially change the ordering 
of records because if two records are sent to a single partition, and the first 
fails and is retried but the second succeeds, then the second record may appear 
first." For our application, this must not happen. Repeating whole batches 
in-order would be okay.

> FlinkKafkaProducer08 fails when partition leader changes
> 
>
> Key: FLINK-4015
> URL: https://issues.apache.org/jira/browse/FLINK-4015
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.2
>Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager   
>- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at OPERATOR.flatMap2(OPERATOR.java:82)
>   at OPERATOR.flatMap2(OPERATOR.java:16)
>   at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4015) FlinkKafkaProducer08 fails when partition leader changes

2016-06-03 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15314026#comment-15314026
 ] 

Robert Metzger commented on FLINK-4015:
---

Hi Sebastian,
do you have set the number of retires set to a value higher than 0 ?
By default, its set to 0 so the producer will not retry in cause such an error 
happens.
I would recommend to allow more retries. The reason why we don't set the number 
to something higher by default is, that retries can cause duplicate records in 
kafka.

> FlinkKafkaProducer08 fails when partition leader changes
> 
>
> Key: FLINK-4015
> URL: https://issues.apache.org/jira/browse/FLINK-4015
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.2
>Reporter: Sebastian Klemke
>
> When leader for a partition changes, producer fails with the following 
> exception:
> {code}
> 06:34:50,813 INFO  org.apache.flink.yarn.YarnJobManager   
>- Status of job b323f5de3d32504651e861d5ecb27e7c (JOB_NAME) changed to 
> FAILING.
> java.lang.RuntimeException: Could not forward element to next operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at OPERATOR.flatMap2(OPERATOR.java:82)
>   at OPERATOR.flatMap2(OPERATOR.java:16)
>   at 
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:63)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:207)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 10 more
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
>   at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 13 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:282)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:249)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
>   ... 16 more
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)