Sachin, you also have a PR for this that could help, right?: https://github.com/apache/kafka/pull/2642#issuecomment-287372367 <https://github.com/apache/kafka/pull/2642#issuecomment-287372367>.
Thanks Eno > On 17 Mar 2017, at 15:19, Sachin Mittal <sjmit...@gmail.com> wrote: > > We also face same issues. > What we have found is that rocksdb is the issue. With many instances of > rocksdb per machine, over the time it slows down due to i/o operations, > resulting in threads getting evicted because max.poll.interval exceeds the > set limit. > > Try running rocksdb in memory https://github.com/facebook/ > rocksdb/wiki/How-to-persist-in-memory-RocksDB-database%3F. > > Thanks > Sachin > > > > On Fri, Mar 17, 2017 at 8:34 PM, Tianji Li <skyah...@gmail.com> wrote: > >> Hi Eno, >> >> I used 150, 50, 20 threads and the probabilities of crashing decreased with >> this number. When using 1 thread, no crash! >> >> My max.poll.interval is 5 minutes and all the processing won't last that >> long, so that parameter does not help. >> >> >> Thanks >> Tianji >> >> On Thu, Mar 16, 2017 at 6:09 PM, Eno Thereska <eno.there...@gmail.com> >> wrote: >> >>> Hi Tianji, >>> >>> How many threads does your app use? >>> >>> One reason is explained here: https://groups.google.com/ >>> forum/#!topic/confluent-platform/wgCSuwIJo5g <https://groups.google.com/ >>> forum/#!topic/confluent-platform/wgCSuwIJo5g>, you might want to >> increase >>> max.poll.interval config value. >>> If that doesn't work, could you revert to using one thread for now. Also >>> let us know either way since we might need to open a bug report. >>> >>> Thanks >>> Eno >>> >>>> On 16 Mar 2017, at 20:47, Tianji Li <skyah...@gmail.com> wrote: >>>> >>>> Hi there, >>>> >>>> I always got this crashes and wonder if anyone knows why. Please let me >>>> know what information I should provide to help with trouble shooting. >>>> >>>> I am using 0.10.2.0. My application is reading one topic and then >>>> groupBy().aggregate() 50 times on different keys. >>>> >>>> I use memory store, without backing to kafka. >>>> >>>> Thanks >>>> Tianji >>>> >>>> >>>> 2017-03-16 16:37:14.060 WARN 26139 --- [StreamThread-14] >>>> o.a.k.s.p.internals.StreamThread : Could not create task 0_4. >>> Will >>>> retry. >>>> >>>> org.apache.kafka.streams.errors.LockException: task [0_4] Failed to >> lock >>>> the state directory: /tmp/kafka-streams/xxx-test28/0_4 >>>> at >>>> org.apache.kafka.streams.processor.internals. >>> ProcessorStateManager.<init>(ProcessorStateManager.java:102) >>>> at >>>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>( >>> AbstractTask.java:73) >>>> at >>>> org.apache.kafka.streams.processor.internals. >>> StreamTask.<init>(StreamTask.java:108) >>>> at >>>> org.apache.kafka.streams.processor.internals. >>> StreamThread.createStreamTask(StreamThread.java:834) >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator. >>> createTask(StreamThread.java:1207) >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread$ >>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) >>>> at >>>> org.apache.kafka.streams.processor.internals. >>> StreamThread.addStreamTasks(StreamThread.java:937) >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.access$500( >>> StreamThread.java:69) >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread$1. >>> onPartitionsAssigned(StreamThread.java:236) >>>> at >>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. >>> onJoinComplete(ConsumerCoordinator.java:255) >>>> at >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >>> joinGroupIfNeeded(AbstractCoordinator.java:339) >>>> at >>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >>> ensureActiveGroup(AbstractCoordinator.java:303) >>>> at >>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( >>> ConsumerCoordinator.java:286) >>>> at >>>> org.apache.kafka.clients.consumer.KafkaConsumer. >>> pollOnce(KafkaConsumer.java:1030) >>>> at >>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( >>> KafkaConsumer.java:995) >>>> at >>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >>> StreamThread.java:582) >>>> at >>>> org.apache.kafka.streams.processor.internals. >>> StreamThread.run(StreamThread.java:368) >>> >>> >>