thank you Piotr On Mon, May 7, 2018 at 2:59 PM Piotr Nowojski <pi...@data-artisans.com> wrote:
> Hi, > > Regardless if that will fix the problem or not, please consider upgrading > to Kafka 0.11.0.2 or 1.0.1. Kafka 0.11.0 release was quite messy and it > might be that the bug you have hit was fixed in 0.11.0.2. > > As a side note, as far as we know our FlinkKafkaProducer011 works fine > with Kafka 1.0.x. > > Piotrek > > On 7 May 2018, at 12:12, Alexander Smirnov <alexander.smirn...@gmail.com> > wrote: > > Hi Piotr, using 0.11.0 Kafka version > > On Sat, May 5, 2018 at 10:19 AM Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> FlinkKafka011Producer uses Kafka 0.11.0.2. >> >> However I’m not sure if bumping KafkaProducer version solves this issue >> or upgrading Kafka. What Kafka version are you using? >> >> Piotrek >> >> >> On 4 May 2018, at 17:55, Alexander Smirnov <alexander.smirn...@gmail.com> >> wrote: >> >> Thanks for quick turnaround Stefan, Piotr >> >> This is a rare reproducible issue and I will keep an eye on it >> >> searching on the Stack Overflow I found >> https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash >> >> They say that the problem is fixed in 0.10.2.1 of kafka producer so I >> wonder which version is used in FlinkKafkaProducer integration. For earlier >> versions it is proposed to use configuration: >> >> final Properties props = new Properties();... >> props.put(ProducerConfig.RETRIES_CONFIG, 10); >> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, >> Integer.toString(Integer.MAX_VALUE));props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, >> 20000); >> >> >> >> >> On Fri, May 4, 2018 at 4:58 PM Piotr Nowojski <pi...@data-artisans.com> >> wrote: >> >>> Hi, >>> >>> I think Stefan is right. Quick google search points to this: >>> https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition >>> >>> Please let us know if changing your configuration will solve the problem! >>> >>> Piotrek >>> >>> On 4 May 2018, at 15:53, Stefan Richter <s.rich...@data-artisans.com> >>> wrote: >>> >>> Hi, >>> >>> I think in general this means that your producer client does not connect >>> to the correct Broker (the leader) but to a broker that is just a follower >>> and the follower can not execute that request. However, I am not sure what >>> causes this in the context of the FlinkKafkaProducer. Maybe Piotr (in CC) >>> has an idea? >>> >>> Best, >>> Stefan >>> >>> Am 04.05.2018 um 15:45 schrieb Alexander Smirnov < >>> alexander.smirn...@gmail.com>: >>> >>> Hi, >>> >>> what could cause the following exception? >>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: >>> Failed to send data to Kafka: This server is not the leader for that >>> topic-partition. >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:999) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:614) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:93) >>> at >>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:219) >>> at >>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >>> at >>> com.alexander.smirnov.FilterBMFunction.flatMap(FilterBMFunction.java:162) >>> >>> >>> Thank you, >>> Alex >>> >>> >>> >>> >> >