Hi Eno, I used 150, 50, 20 threads and the probabilities of crashing decreased with this number. When using 1 thread, no crash!
My max.poll.interval is 5 minutes and all the processing won't last that long, so that parameter does not help. Thanks Tianji On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Tianji, > > How many threads does your app use? > > One reason is explained here: https://groups.google.com/ > forum/#!topic/confluent-platform/wgCSuwIJo5g <https://groups.google.com/ > forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to increase > max.poll.interval config value. > If that doesn't work, could you revert to using one thread for now. Also > let us know either way since we might need to open a bug report. > > Thanks > Eno > > > On 16 Mar 2017, at 20:47, Tianji Li <skyah...@gmail.com> wrote: > > > > Hi there, > > > > I always got this crashes and wonder if anyone knows why. Please let me > > know what information I should provide to help with trouble shooting. > > > > I am using 0.10.2.0. My application is reading one topic and then > > groupBy().aggregate() 50 times on different keys. > > > > I use memory store, without backing to kafka. > > > > Thanks > > Tianji > > > > > > 2017-03-16 16:37:14.060 WARN 26139 --- [StreamThread-14] > > o.a.k.s.p.internals.StreamThread : Could not create task 0_4. > Will > > retry. > > > > org.apache.kafka.streams.errors.LockException: task [0_4] Failed to lock > > the state directory: /tmp/kafka-streams/xxx-test28/0_4 > > at > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager.<init>(ProcessorStateManager.java:102) > > at > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>( > AbstractTask.java:73) > > at > > org.apache.kafka.streams.processor.internals. > StreamTask.<init>(StreamTask.java:108) > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.createStreamTask(StreamThread.java:834) > > at > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator. > createTask(StreamThread.java:1207) > > at > > org.apache.kafka.streams.processor.internals.StreamThread$ > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.addStreamTasks(StreamThread.java:937) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.access$500( > StreamThread.java:69) > > at > > org.apache.kafka.streams.processor.internals.StreamThread$1. > onPartitionsAssigned(StreamThread.java:236) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > onJoinComplete(ConsumerCoordinator.java:255) > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > joinGroupIfNeeded(AbstractCoordinator.java:339) > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > ensureActiveGroup(AbstractCoordinator.java:303) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > ConsumerCoordinator.java:286) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1030) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:995) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:582) > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:368) > >