FlinkKafka011Producer uses Kafka 0.11.0.2. 

However I’m not sure if bumping KafkaProducer version solves this issue or 
upgrading Kafka. What Kafka version are you using?

Piotrek

> On 4 May 2018, at 17:55, Alexander Smirnov <alexander.smirn...@gmail.com> 
> wrote:
> 
> Thanks for quick turnaround Stefan, Piotr
> 
> This is a rare reproducible issue and I will keep an eye on it
> 
> searching on the Stack Overflow I found 
> https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash
>  
> <https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash>
> 
> They say that the problem is fixed in 0.10.2.1 of kafka producer so I wonder 
> which version is used in FlinkKafkaProducer integration. For earlier versions 
> it is proposed to use configuration:
> 
> final Properties props = new Properties();
> ...
> props.put(ProducerConfig.RETRIES_CONFIG, 10);  
> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
> Integer.toString(Integer.MAX_VALUE));
> props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
> 
> 
> 
> On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> I think Stefan is right. Quick google search points to this: 
> https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition
>  
> <https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition>
> 
> Please let us know if changing your configuration will solve the problem!
> 
> Piotrek
> 
>> On 4 May 2018, at 15:53, Stefan Richter <s.rich...@data-artisans.com 
>> <mailto:s.rich...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> I think in general this means that your producer client does not connect to 
>> the correct Broker (the leader) but to a broker that is just a follower and 
>> the follower can not execute that request. However, I am not sure what 
>> causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) 
>> has an idea?
>> 
>> Best,
>> Stefan
>> 
>>> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov 
>>> <alexander.smirn...@gmail.com <mailto:alexander.smirn...@gmail.com>>:
>>> 
>>> Hi,
>>> 
>>> what could cause the following exception?
>>> 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed 
>>> to send data to Kafka: This server is not the leader for that 
>>> topic-partition.
>>>     at 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
>>>     at 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
>>>     at 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
>>>     at 
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>>>     at 
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>     at 
>>> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>>> 
>>> 
>>> Thank you,
>>> Alex
>> 
> 

Reply via email to