Hi, So I have added the config ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE and the NotLeaderForPartitionException is gone.
However we see a new exception especially under heavy load: org.apache.kafka.streams.errors.StreamsException: task [0_1] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:127) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:787) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:774) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:749) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:671) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:378) ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for new-part-advice-key-table-changelog-1: 30001 ms has passed since last append So any idea as why TimeoutException is happening. Is this controlled by ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG If yes What should the value be set in this given that out consumer max.poll.interval.ms is defaul 5 minutes. Is there any other setting that we should try to avoid such errors which causes stream thread to die. Thanks Sachin On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Sachin, > > Not in this case. > > Thanks > Eno > > > On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sjmit...@gmail.com> wrote: > > > > OK. > > I will try this out. > > > > Do I need to change anything for > > max.in.flight.requests.per.connection > > > > Thanks > > Sachin > > > > > > On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <eno.there...@gmail.com> > > wrote: > > > >> Hi Sachin, > >> > >> For this particular error, “org.apache.kafka.common.errors. > >> NotLeaderForPartitionException: This server is not the leader for that > >> topic-partition.”, could you try setting the number of retries to > something > >> large like this: > >> > >> Properties props = new Properties(); > >> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); > >> ... > >> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); > >> > >> This will retry the produce requests and should hopefully solve your > >> immediate problem. > >> > >> Thanks > >> Eno > >> > >> > >> On 25/03/2017, 08:35, "Sachin Mittal" <sjmit...@gmail.com> wrote: > >> > >> Hi, > >> We have encountered another case of series of errors which I would > need > >> more help in understanding. > >> > >> In logs we see message like this: > >> ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread | > >> 85-StreamThread-3-producer]: > >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl - > >> task > >> [0_1] Error sending record to topic new-part-advice-key-table- > changelog. > >> No > >> more offsets will be recorded for this task and the exception will > >> eventually be thrown > >> > >> then some millisecond later > >> ERROR 2017-03-25 03:41:40,149 [StreamThread-3]: > >> org.apache.kafka.streams.processor.internals.StreamThread - > >> stream-thread > >> [StreamThread-3] Failed while executing StreamTask 0_1 due to flush > >> state: > >> org.apache.kafka.streams.errors.StreamsException: task [0_1] > exception > >> caught when producing > >> at > >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > >> checkForException(RecordCollectorImpl.java:119) > >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> at > >> org.apache.kafka.streams.processor.internals. > >> RecordCollectorImpl.flush(RecordCollectorImpl.java:127) > >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> at > >> org.apache.kafka.streams.processor.internals.StreamTask.flushState( > >> StreamTask.java:422) > >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread$4.apply( > >> StreamThread.java:555) > >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> performOnAllTasks(StreamThread.java:513) > >> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> at > >> org.apache.kafka.streams.processor.internals. > >> StreamThread.flushAllState(StreamThread.java:551) > >> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread. > >> shutdownTasksAndState(StreamThread.java:463) > >> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread.shutdown( > >> StreamThread.java:408) > >> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> at > >> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:389) > >> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> org.apache.kafka.common.errors.NotLeaderForPartitionException: This > >> server > >> is not the leader for that topic-partition. > >> > >> finally we get this > >> ERROR 2017-03-25 03:41:45,724 [StreamThread-3]: > >> com.advice.TestKafkaAdvice > >> - Uncaught exception: > >> org.apache.kafka.streams.errors.StreamsException: Exception caught > in > >> process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, > >> topic=advice-stream, partition=1, offset=48062286 > >> at > >> org.apache.kafka.streams.processor.internals. > >> StreamTask.process(StreamTask.java:216) > >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> at > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:651) > >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> at > >> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:378) > >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> Caused by: org.apache.kafka.streams.errors.StreamsException: task > >> [0_1] > >> exception caught when producing > >> at > >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > >> checkForException(RecordCollectorImpl.java:119) > >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> at > >> org.apache.kafka.streams.processor.internals. > RecordCollectorImpl.send( > >> RecordCollectorImpl.java:76) > >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> at > >> org.apache.kafka.streams.processor.internals. > RecordCollectorImpl.send( > >> RecordCollectorImpl.java:64) > >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na] > >> > >> > >> Again it is not clear why in this case we need to shut down the > steams > >> thread and eventually the application. Shouldn't we capture this > error > >> too? > >> > >> Thanks > >> Sachin > >> > >> > >> > >> > >