Hi,
So I have added the config ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE
and the NotLeaderForPartitionException is gone.

However we see a new exception especially under heavy load:
org.apache.kafka.streams.errors.StreamsException: task [0_1] exception
caught when producing
  at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
  at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:76)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
  at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
  at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:787)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
  at
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:774)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:749)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
  at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:671)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:378)
~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
new-part-advice-key-table-changelog-1: 30001 ms has passed since last append

So any idea as why TimeoutException is happening.
Is this controlled by
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG

If yes
What should the value be set in this given that out consumer
max.poll.interval.ms is defaul 5 minutes.

Is there any other setting that we should try to avoid such errors which
causes stream thread to die.

Thanks
Sachin


On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Sachin,
>
> Not in this case.
>
> Thanks
> Eno
>
> > On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > OK.
> > I will try this out.
> >
> > Do I need to change anything for
> > max.in.flight.requests.per.connection
> >
> > Thanks
> > Sachin
> >
> >
> > On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <eno.there...@gmail.com>
> > wrote:
> >
> >> Hi Sachin,
> >>
> >> For this particular error, “org.apache.kafka.common.errors.
> >> NotLeaderForPartitionException: This server is not the leader for that
> >> topic-partition.”, could you try setting the number of retries to
> something
> >> large like this:
> >>
> >> Properties props = new Properties();
> >> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
> >> ...
> >> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
> >>
> >> This will retry the produce requests and should hopefully solve your
> >> immediate problem.
> >>
> >> Thanks
> >> Eno
> >>
> >>
> >> On 25/03/2017, 08:35, "Sachin Mittal" <sjmit...@gmail.com> wrote:
> >>
> >>    Hi,
> >>    We have encountered another case of series of errors which I would
> need
> >>    more help in understanding.
> >>
> >>    In logs we see message like this:
> >>    ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
> >>    85-StreamThread-3-producer]:
> >>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
> >> task
> >>    [0_1] Error sending record to topic new-part-advice-key-table-
> changelog.
> >> No
> >>    more offsets will be recorded for this task and the exception will
> >>    eventually be thrown
> >>
> >>    then some millisecond later
> >>    ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
> >>    org.apache.kafka.streams.processor.internals.StreamThread -
> >> stream-thread
> >>    [StreamThread-3] Failed while executing StreamTask 0_1 due to flush
> >> state:
> >>    org.apache.kafka.streams.errors.StreamsException: task [0_1]
> exception
> >>    caught when producing
> >>        at
> >>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >> checkForException(RecordCollectorImpl.java:119)
> >>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>        at
> >>    org.apache.kafka.streams.processor.internals.
> >> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
> >>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>        at
> >>    org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> >> StreamTask.java:422)
> >>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>        at
> >>    org.apache.kafka.streams.processor.internals.StreamThread$4.apply(
> >> StreamThread.java:555)
> >>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>        at
> >>    org.apache.kafka.streams.processor.internals.StreamThread.
> >> performOnAllTasks(StreamThread.java:513)
> >>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>        at
> >>    org.apache.kafka.streams.processor.internals.
> >> StreamThread.flushAllState(StreamThread.java:551)
> >>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>        at
> >>    org.apache.kafka.streams.processor.internals.StreamThread.
> >> shutdownTasksAndState(StreamThread.java:463)
> >>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>        at
> >>    org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> >> StreamThread.java:408)
> >>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>        at
> >>    org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:389)
> >>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>    org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> >> server
> >>    is not the leader for that topic-partition.
> >>
> >>    finally we get this
> >>    ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
> >> com.advice.TestKafkaAdvice
> >>    - Uncaught exception:
> >>    org.apache.kafka.streams.errors.StreamsException: Exception caught
> in
> >>    process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
> >>    topic=advice-stream, partition=1, offset=48062286
> >>        at
> >>    org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:216)
> >>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>        at
> >>    org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >> StreamThread.java:651)
> >>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>        at
> >>    org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:378)
> >>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>    Caused by: org.apache.kafka.streams.errors.StreamsException: task
> >> [0_1]
> >>    exception caught when producing
> >>        at
> >>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >> checkForException(RecordCollectorImpl.java:119)
> >>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>        at
> >>    org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> >> RecordCollectorImpl.java:76)
> >>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>        at
> >>    org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> >> RecordCollectorImpl.java:64)
> >>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>
> >>
> >>    Again it is not clear why in this case we need to shut down the
> steams
> >>    thread and eventually the application. Shouldn't we capture this
> error
> >> too?
> >>
> >>    Thanks
> >>    Sachin
> >>
> >>
> >>
> >>
>
>

Reply via email to