FlinkKafka011Producer uses Kafka 0.11.0.2. However I’m not sure if bumping KafkaProducer version solves this issue or upgrading Kafka. What Kafka version are you using?
Piotrek > On 4 May 2018, at 17:55, Alexander Smirnov <alexander.smirn...@gmail.com> > wrote: > > Thanks for quick turnaround Stefan, Piotr > > This is a rare reproducible issue and I will keep an eye on it > > searching on the Stack Overflow I found > https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash > > <https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash> > > They say that the problem is fixed in 0.10.2.1 of kafka producer so I wonder > which version is used in FlinkKafkaProducer integration. For earlier versions > it is proposed to use configuration: > > final Properties props = new Properties(); > ... > props.put(ProducerConfig.RETRIES_CONFIG, 10); > props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, > Integer.toString(Integer.MAX_VALUE)); > props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000); > > > > On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <pi...@data-artisans.com > <mailto:pi...@data-artisans.com>> wrote: > Hi, > > I think Stefan is right. Quick google search points to this: > https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition > > <https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition> > > Please let us know if changing your configuration will solve the problem! > > Piotrek > >> On 4 May 2018, at 15:53, Stefan Richter <s.rich...@data-artisans.com >> <mailto:s.rich...@data-artisans.com>> wrote: >> >> Hi, >> >> I think in general this means that your producer client does not connect to >> the correct Broker (the leader) but to a broker that is just a follower and >> the follower can not execute that request. However, I am not sure what >> causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) >> has an idea? >> >> Best, >> Stefan >> >>> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov >>> <alexander.smirn...@gmail.com <mailto:alexander.smirn...@gmail.com>>: >>> >>> Hi, >>> >>> what could cause the following exception? >>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed >>> to send data to Kafka: This server is not the leader for that >>> topic-partition. >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93) >>> at >>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219) >>> at >>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>> at >>> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162) >>> >>> >>> Thank you, >>> Alex >> >