thank you Piotr

On Mon, May 7, 2018 at 2:59 PM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Regardless if that will fix the problem or not, please consider upgrading
> to Kafka 0.11.0.2 or 1.0.1. Kafka 0.11.0 release was quite messy and it
> might be that the bug you have hit was fixed in 0.11.0.2.
>
> As a side note, as far as we know our FlinkKafkaProducer011 works fine
> with Kafka 1.0.x.
>
> Piotrek
>
> On 7 May 2018, at 12:12, Alexander Smirnov <alexander.smirn...@gmail.com>
> wrote:
>
> Hi Piotr, using 0.11.0 Kafka version
>
> On Sat, May 5, 2018 at 10:19 AM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> FlinkKafka011Producer uses Kafka 0.11.0.2.
>>
>> However I’m not sure if bumping KafkaProducer version solves this issue
>> or upgrading Kafka. What Kafka version are you using?
>>
>> Piotrek
>>
>>
>> On 4 May 2018, at 17:55, Alexander Smirnov <alexander.smirn...@gmail.com>
>> wrote:
>>
>> Thanks for quick turnaround Stefan, Piotr
>>
>> This is a rare reproducible issue and I will keep an eye on it
>>
>> searching on the Stack Overflow I found
>> https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash
>>
>> They say that the problem is fixed in 0.10.2.1 of kafka producer so I
>> wonder which version is used in FlinkKafkaProducer integration. For earlier
>> versions it is proposed to use configuration:
>>
>> final Properties props = new Properties();...
>> props.put(ProducerConfig.RETRIES_CONFIG, 10);
>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
>> Integer.toString(Integer.MAX_VALUE));props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
>>  20000);
>>
>>
>>
>>
>> On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <pi...@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I think Stefan is right. Quick google search points to this:
>>> https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition
>>>
>>> Please let us know if changing your configuration will solve the problem!
>>>
>>> Piotrek
>>>
>>> On 4 May 2018, at 15:53, Stefan Richter <s.rich...@data-artisans.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I think in general this means that your producer client does not connect
>>> to the correct Broker (the leader) but to a broker that is just a follower
>>> and the follower can not execute that request. However, I am not sure what
>>> causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC)
>>> has an idea?
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov <
>>> alexander.smirn...@gmail.com>:
>>>
>>> Hi,
>>>
>>> what could cause the following exception?
>>>
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception:
>>> Failed to send data to Kafka: This server is not the leader for that
>>> topic-partition.
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93)
>>> at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162)
>>>
>>>
>>> Thank you,
>>> Alex
>>>
>>>
>>>
>>>
>>
>

Reply via email to