Hey Eno, I just pulled the latest jar from the link you shared and tried to run my code. I am getting the following exception on new KafkaStreams(). The same code is working fine with 0.10.2.0 jar.
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>( KafkaConsumer.java:717) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>( KafkaConsumer.java:566) at org.apache.kafka.streams.processor.internals. DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.java:38) at org.apache.kafka.streams.processor.internals.StreamThread.<init>( StreamThread.java:316) at org.apache.kafka.streams.KafkaStreams.<init>( KafkaStreams.java:358) at org.apache.kafka.streams.KafkaStreams.<init>( KafkaStreams.java:279) Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients. Metadata.update(Lorg/apache/kafka/common/Cluster;Ljava/util/Set;J)V at org.apache.kafka.streams.processor.internals. StreamsKafkaClient.<init>(StreamsKafkaClient.java:98) at org.apache.kafka.streams.processor.internals. StreamsKafkaClient.<init>(StreamsKafkaClient.java:82) at org.apache.kafka.streams.processor.internals. StreamPartitionAssignor.configure(StreamPartitionAssignor.java:219) at org.apache.kafka.common.config.AbstractConfig. getConfiguredInstances(AbstractConfig.java:254) at org.apache.kafka.common.config.AbstractConfig. getConfiguredInstances(AbstractConfig.java:220) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>( KafkaConsumer.java:673) ... 6 more On Tue, Apr 18, 2017 at 5:47 AM, Mahendra Kariya <mahendra.kar...@go-jek.com > wrote: > Thanks! > > On Tue, Apr 18, 2017, 12:26 AM Eno Thereska <eno.there...@gmail.com> > wrote: > >> The RC candidate build is here: http://home.apache.org/~ >> gwenshap/kafka-0.10.2.1-rc1/ <http://home.apache.org/~ >> gwenshap/kafka-0.10.2.1-rc1/> >> >> Eno >> > On 17 Apr 2017, at 17:20, Mahendra Kariya <mahendra.kar...@go-jek.com> >> wrote: >> > >> > Thanks! >> > >> > In the meantime, is the jar published somewhere on github or as a part >> of >> > build pipeline? >> > >> > On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska <eno.there...@gmail.com> >> > wrote: >> > >> >> Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this >> week. >> >> >> >> Eno >> >>> On 17 Apr 2017, at 13:25, Mahendra Kariya <mahendra.kar...@go-jek.com >> > >> >> wrote: >> >>> >> >>> Are the bug fix releases published to Maven central repo? >> >>> >> >>> On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska <eno.there...@gmail.com >> > >> >>> wrote: >> >>> >> >>>> Hi Sachin, >> >>>> >> >>>> In the bug fix release for 0.10.2 (and in trunk) we have now set >> >>>> max.poll.interval to infinite since from our experience with streams >> >> this >> >>>> should not be something that users set: https://github.com/apache/ >> >>>> kafka/pull/2770/files <https://github.com/apache/ >> kafka/pull/2770/files >> >>> . >> >>>> >> >>>> We're in the process of documenting that change. For now you can >> >> increase >> >>>> the request timeout without worrying about max.poll.interval >> anymore. In >> >>>> fact I'd suggest you also increase max.poll.interval as we've done it >> >> above. >> >>>> >> >>>> Thanks >> >>>> Eno >> >>>> >> >>>>> On 1 Apr 2017, at 03:28, Sachin Mittal <sjmit...@gmail.com> wrote: >> >>>>> >> >>>>> Should this timeout be less than max poll interval value? if yes >> than >> >>>>> generally speaking what should be the ratio between two or range for >> >> this >> >>>>> timeout value . >> >>>>> >> >>>>> Thanks >> >>>>> Sachin >> >>>>> >> >>>>> On 1 Apr 2017 04:57, "Matthias J. Sax" <matth...@confluent.io> >> wrote: >> >>>>> >> >>>>> Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG >> >>>>> >> >>>>> >> >>>>> -Matthias >> >>>>> >> >>>>> >> >>>>> On 3/31/17 11:32 AM, Sachin Mittal wrote: >> >>>>>> Hi, >> >>>>>> So I have added the config ProducerConfig.RETRIES_CONFIG, >> >>>>> Integer.MAX_VALUE >> >>>>>> and the NotLeaderForPartitionException is gone. >> >>>>>> >> >>>>>> However we see a new exception especially under heavy load: >> >>>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1] >> >> exception >> >>>>>> caught when producing >> >>>>>> at >> >>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. >> >>>>> checkForException(RecordCollectorImpl.java:119) >> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>> at >> >>>>>> org.apache.kafka.streams.processor.internals. >> >> RecordCollectorImpl.flush( >> >>>>> RecordCollectorImpl.java:127) >> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at >> >>>>>> org.apache.kafka.streams.processor.internals. >> >>>> StreamTask$1.run(StreamTask. >> >>>>> java:76) >> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>> at >> >>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> >>>>> measureLatencyNs(StreamsMetricsImpl.java:188) >> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>> at >> >>>>>> org.apache.kafka.streams.processor.internals. >> >>>> StreamTask.commit(StreamTask. >> >>>>> java:280) >> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at >> >>>>>> org.apache.kafka.streams.processor.internals. >> StreamThread.commitOne( >> >>>>> StreamThread.java:787) >> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>> at >> >>>>>> org.apache.kafka.streams.processor.internals. >> StreamThread.commitAll( >> >>>>> StreamThread.java:774) >> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at >> >>>>>> org.apache.kafka.streams.processor.internals. >> >> StreamThread.maybeCommit( >> >>>>> StreamThread.java:749) >> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>> at >> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> >>>>> StreamThread.java:671) >> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at >> >>>>>> org.apache.kafka.streams.processor.internals. >> >>>>> StreamThread.run(StreamThread.java:378) >> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>> org.apache.kafka.common.errors.TimeoutException: Expiring 1 >> record(s) >> >>>> for >> >>>>>> new-part-advice-key-table-changelog-1: 30001 ms has passed since >> last >> >>>>> append >> >>>>>> >> >>>>>> So any idea as why TimeoutException is happening. >> >>>>>> Is this controlled by >> >>>>>> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG >> >>>>>> >> >>>>>> If yes >> >>>>>> What should the value be set in this given that out consumer >> >>>>>> max.poll.interval.ms is defaul 5 minutes. >> >>>>>> >> >>>>>> Is there any other setting that we should try to avoid such errors >> >> which >> >>>>>> causes stream thread to die. >> >>>>>> >> >>>>>> Thanks >> >>>>>> Sachin >> >>>>>> >> >>>>>> >> >>>>>> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska < >> eno.there...@gmail.com >> >>> >> >>>>>> wrote: >> >>>>>> >> >>>>>>> Hi Sachin, >> >>>>>>> >> >>>>>>> Not in this case. >> >>>>>>> >> >>>>>>> Thanks >> >>>>>>> Eno >> >>>>>>> >> >>>>>>>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sjmit...@gmail.com> >> >>>> wrote: >> >>>>>>>> >> >>>>>>>> OK. >> >>>>>>>> I will try this out. >> >>>>>>>> >> >>>>>>>> Do I need to change anything for >> >>>>>>>> max.in.flight.requests.per.connection >> >>>>>>>> >> >>>>>>>> Thanks >> >>>>>>>> Sachin >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska < >> >>>> eno.there...@gmail.com> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>>> Hi Sachin, >> >>>>>>>>> >> >>>>>>>>> For this particular error, “org.apache.kafka.common.errors. >> >>>>>>>>> NotLeaderForPartitionException: This server is not the leader >> for >> >>>> that >> >>>>>>>>> topic-partition.”, could you try setting the number of retries >> to >> >>>>>>> something >> >>>>>>>>> large like this: >> >>>>>>>>> >> >>>>>>>>> Properties props = new Properties(); >> >>>>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); >> >>>>>>>>> ... >> >>>>>>>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); >> >>>>>>>>> >> >>>>>>>>> This will retry the produce requests and should hopefully solve >> >> your >> >>>>>>>>> immediate problem. >> >>>>>>>>> >> >>>>>>>>> Thanks >> >>>>>>>>> Eno >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> On 25/03/2017, 08:35, "Sachin Mittal" <sjmit...@gmail.com> >> wrote: >> >>>>>>>>> >> >>>>>>>>> Hi, >> >>>>>>>>> We have encountered another case of series of errors which I >> would >> >>>>>>> need >> >>>>>>>>> more help in understanding. >> >>>>>>>>> >> >>>>>>>>> In logs we see message like this: >> >>>>>>>>> ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread | >> >>>>>>>>> 85-StreamThread-3-producer]: >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> RecordCollectorImpl >> >> - >> >>>>>>>>> task >> >>>>>>>>> [0_1] Error sending record to topic new-part-advice-key-table- >> >>>>>>> changelog. >> >>>>>>>>> No >> >>>>>>>>> more offsets will be recorded for this task and the exception >> will >> >>>>>>>>> eventually be thrown >> >>>>>>>>> >> >>>>>>>>> then some millisecond later >> >>>>>>>>> ERROR 2017-03-25 03:41:40,149 [StreamThread-3]: >> >>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread - >> >>>>>>>>> stream-thread >> >>>>>>>>> [StreamThread-3] Failed while executing StreamTask 0_1 due to >> >> flush >> >>>>>>>>> state: >> >>>>>>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1] >> >>>>>>> exception >> >>>>>>>>> caught when producing >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> RecordCollectorImpl. >> >>>>>>>>> checkForException(RecordCollectorImpl.java:119) >> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>>>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127) >> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> >>>> StreamTask.flushState( >> >>>>>>>>> StreamTask.java:422) >> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> >> StreamThread$4.apply( >> >>>>>>>>> StreamThread.java:555) >> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread. >> >>>>>>>>> performOnAllTasks(StreamThread.java:513) >> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>>>>> StreamThread.flushAllState(StreamThread.java:551) >> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread. >> >>>>>>>>> shutdownTasksAndState(StreamThread.java:463) >> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> >>>> StreamThread.shutdown( >> >>>>>>>>> StreamThread.java:408) >> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>>>>> StreamThread.run(StreamThread.java:389) >> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> org.apache.kafka.common.errors.NotLeaderForPartitionException: >> >>>> This >> >>>>>>>>> server >> >>>>>>>>> is not the leader for that topic-partition. >> >>>>>>>>> >> >>>>>>>>> finally we get this >> >>>>>>>>> ERROR 2017-03-25 03:41:45,724 [StreamThread-3]: >> >>>>>>>>> com.advice.TestKafkaAdvice >> >>>>>>>>> - Uncaught exception: >> >>>>>>>>> org.apache.kafka.streams.errors.StreamsException: Exception >> >> caught >> >>>>>>> in >> >>>>>>>>> process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, >> >>>>>>>>> topic=advice-stream, partition=1, offset=48062286 >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>>>>> StreamTask.process(StreamTask.java:216) >> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> >> StreamThread.runLoop( >> >>>>>>>>> StreamThread.java:651) >> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>>>>> StreamThread.run(StreamThread.java:378) >> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: >> task >> >>>>>>>>> [0_1] >> >>>>>>>>> exception caught when producing >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> RecordCollectorImpl. >> >>>>>>>>> checkForException(RecordCollectorImpl.java:119) >> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>>> RecordCollectorImpl.send( >> >>>>>>>>> RecordCollectorImpl.java:76) >> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> at >> >>>>>>>>> org.apache.kafka.streams.processor.internals. >> >>>>>>> RecordCollectorImpl.send( >> >>>>>>>>> RecordCollectorImpl.java:64) >> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> Again it is not clear why in this case we need to shut down the >> >>>>>>> steams >> >>>>>>>>> thread and eventually the application. Shouldn't we capture this >> >>>>>>> error >> >>>>>>>>> too? >> >>>>>>>>> >> >>>>>>>>> Thanks >> >>>>>>>>> Sachin >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>> >> >>>> >> >>>> >> >> >> >> >> >>