Hi Eno,

I used 150, 50, 20 threads and the probabilities of crashing decreased with
this number. When using 1 thread, no crash!

My max.poll.interval is 5 minutes and all the processing won't last that
long, so that parameter does not help.


Thanks
Tianji

On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Tianji,
>
> How many threads does your app use?
>
> One reason is explained here: https://groups.google.com/
> forum/#!topic/confluent-platform/wgCSuwIJo5g <https://groups.google.com/
> forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to increase
> max.poll.interval config value.
> If that doesn't work, could you revert to using one thread for now. Also
> let us know either way since we might need to open a bug report.
>
> Thanks
> Eno
>
> > On 16 Mar 2017, at 20:47, Tianji Li <skyah...@gmail.com> wrote:
> >
> > Hi there,
> >
> > I always got this crashes and wonder if anyone knows why. Please let me
> > know what information I should provide to help with trouble shooting.
> >
> > I am using 0.10.2.0. My application is reading one topic and then
> > groupBy().aggregate() 50 times on different keys.
> >
> > I use memory store, without backing to kafka.
> >
> > Thanks
> > Tianji
> >
> >
> > 2017-03-16 16:37:14.060  WARN 26139 --- [StreamThread-14]
> > o.a.k.s.p.internals.StreamThread         : Could not create task 0_4.
> Will
> > retry.
> >
> > org.apache.kafka.streams.errors.LockException: task [0_4] Failed to lock
> > the state directory: /tmp/kafka-streams/xxx-test28/0_4
> >        at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> >        at
> > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> AbstractTask.java:73)
> >        at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.<init>(StreamTask.java:108)
> >        at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:834)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> createTask(StreamThread.java:1207)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread$
> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> >        at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:937)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread.access$500(
> StreamThread.java:69)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:236)
> >        at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:255)
> >        at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:339)
> >        at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> >        at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:286)
> >        at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1030)
> >        at
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> >        at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:582)
> >        at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
>
>

Reply via email to