Hi,

Regardless if that will fix the problem or not, please consider upgrading to 
Kafka 0.11.0.2 or 1.0.1. Kafka 0.11.0 release was quite messy and it might be 
that the bug you have hit was fixed in 0.11.0.2.

As a side note, as far as we know our FlinkKafkaProducer011 works fine with 
Kafka 1.0.x.

Piotrek

> On 7 May 2018, at 12:12, Alexander Smirnov <alexander.smirn...@gmail.com> 
> wrote:
> 
> Hi Piotr, using 0.11.0 Kafka version
> 
> On Sat, May 5, 2018 at 10:19 AM Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> FlinkKafka011Producer uses Kafka 0.11.0.2. 
> 
> However I’m not sure if bumping KafkaProducer version solves this issue or 
> upgrading Kafka. What Kafka version are you using?
> 
> Piotrek
> 
> 
>> On 4 May 2018, at 17:55, Alexander Smirnov <alexander.smirn...@gmail.com 
>> <mailto:alexander.smirn...@gmail.com>> wrote:
>> 
>> Thanks for quick turnaround Stefan, Piotr
>> 
>> This is a rare reproducible issue and I will keep an eye on it
>> 
>> searching on the Stack Overflow I found 
>> https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash
>>  
>> <https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash>
>> 
>> They say that the problem is fixed in 0.10.2.1 of kafka producer so I wonder 
>> which version is used in FlinkKafkaProducer integration. For earlier 
>> versions it is proposed to use configuration:
>> 
>> final Properties props = new Properties();
>> ...
>> props.put(ProducerConfig.RETRIES_CONFIG, 10);  
>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
>> Integer.toString(Integer.MAX_VALUE));
>> props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20000);
>> 
>> 
>> 
>> On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> I think Stefan is right. Quick google search points to this: 
>> https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition
>>  
>> <https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition>
>> 
>> Please let us know if changing your configuration will solve the problem!
>> 
>> Piotrek
>> 
>>> On 4 May 2018, at 15:53, Stefan Richter <s.rich...@data-artisans.com 
>>> <mailto:s.rich...@data-artisans.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I think in general this means that your producer client does not connect to 
>>> the correct Broker (the leader) but to a broker that is just a follower and 
>>> the follower can not execute that request. However, I am not sure what 
>>> causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) 
>>> has an idea?
>>> 
>>> Best,
>>> Stefan
>>> 
>>>> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov 
>>>> <alexander.smirn...@gmail.com <mailto:alexander.smirn...@gmail.com>>:
>>>> 
>>>> Hi,
>>>> 
>>>> what could cause the following exception?
>>>> 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed 
>>>> to send data to Kafka: This server is not the leader for that 
>>>> topic-partition.
>>>>    at 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
>>>>    at 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
>>>>    at 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
>>>>    at 
>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
>>>>    at 
>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>>>>    at 
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>>>>    at 
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>>>>    at 
>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>    at 
>>>> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>>>> 
>>>> 
>>>> Thank you,
>>>> Alex
>>> 
>> 
> 

Reply via email to