Hi, I have debugged the problem further and looks like something is not right.
Here are the logs from one of the thread stream-thread [StreamThread-3] Committing all tasks because the commit interval 30000ms has elapsed stream-thread [StreamThread-3] Committing task 0_0 stream-thread [StreamThread-3] Committing task 0_20 stream-thread [StreamThread-3] Committing task 0_6 stream-thread [StreamThread-3] Committing task 0_9 stream-thread [StreamThread-3] Committing task 0_26 Discovered coordinator 192.168.73.198:9092 (id: 2147483643 rack: null) for group new-part. Revoking previously assigned partitions [stream-9, stream-26, stream-0, stream-20, stream-6] for group new-part stream-thread [StreamThread-3] partitions [[stream-9, stream-26, stream-0, stream-20, stream-6]] revoked at the beginning of consumer rebalance. stream-thread [StreamThread-3] Committing consumer offsets of task 0_0 stream-thread [StreamThread-3] Failed while executing StreamTask 0_0 duet to commit consumer offsets: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. stream-thread [StreamThread-3] Removing all active tasks [[0_0, 0_20, 0_6, 0_9, 0_26]] stream-thread [StreamThread-3] Removing all standby tasks [[]] Setting newly assigned partitions [stream-9, stream-26, stream-0, stream-20, stream-6] for group new-part stream-thread [StreamThread-3] New partitions [[stream-9, stream-26, stream-0, stream-20, stream-6]] assigned at the end of consumer rebalance. (Re-)joining group new-part stream-thread [StreamThread-3] Failed to create an active task 0_0: org.apache.kafka.streams.errors.ProcessorStateException: Error opening store key-table-201702060200 at location /data/kafka-streams/new-part/0_0/key-table/key-table-201702060200 Caused by: org.rocksdb.RocksDBException: IO error: lock /data/advice/kafka-streams/new-part-advice/0_0/key-table/key-table-201702060200/LOCK: No locks available So You can see a thread has 5 tasks. After committing the task it somehow revokes those assigned partitions and then tries to commit the offsets. Here we get the CommitFailedException. Then it removes all the tasks and then gets newly assigned tasks which are same as before. Then it fails to create those tasks which is caused by No locks available. So something does not add up. Why did it try to revoke partitions and then why did it get those same partitions again. What I feel is that when revoking the partitions it is not releasing lock on rocks db and when some other thread tries to get that lock it fails. Please let me know what could be going wrong. Thanks Sachin On Mon, Feb 6, 2017 at 11:51 AM, Sachin Mittal <sjmit...@gmail.com> wrote: > Hi All, > We some time get errors like this: > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the > partitions to another member. This means that the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, > which typically implies that the poll loop is spending too much time > message processing. You can address this either by increasing the session > timeout or by reducing the maximum size of batches returned in poll() with > max.poll.records. > at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > sendOffsetCommitRequest(ConsumerCoordinator.java:600) > ~[kafka-clients-0.10.1.1.jar:na] > at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > commitOffsetsSync(ConsumerCoordinator.java:498) > ~[kafka-clients-0.10.1.1.jar:na] > at org.apache.kafka.clients.consumer.KafkaConsumer. > commitSync(KafkaConsumer.java:1104) ~[kafka-clients-0.10.1.1.jar:na] > at org.apache.kafka.streams.processor.internals. > StreamTask.commitOffsets(StreamTask.java:289) > ~[kafka-streams-0.10.1.1.jar:na] > > And our streams application crash. > > Now this error is straightforward. We somehow need to look where our > process is spending that much time. Then we can either fix the process time > to stay within the defaults or change the defaults. > Since we are running High-Level Streams DSL is there anyway to know where > we would be spending that much time. > > Our topology is like this > input > .groupByKey() > .aggregate(new Initializer<SortedSet<T>>() { > public SortedSet<T> apply() { > return new TreeSet<T>(); > } > }, new Aggregator<String, T, SortedSet<T>>() { > public SortedSet<T> apply(String key, T value, SortedSet<T> aggregate) > { > aggregate.add(value); > return aggregate; > } > }, TimeWindows.of(...).advanceBy(...).until(...), valueSerde, "key-table") > .mapValues(new ValueMapper<SortedSet<T>, SortedSet<U>>() { > public SortedSet<U> apply(SortedSet<T> t) { > SortedSet<U> u = new TreeSet<U>(); > ... > return u; > } > }) > .foreach(new ForeachAction<Windowed<String>, SortedSet<U>>() { > public void apply(Windowed<String> key, SortedSet<U> nodes) { > ... > } > }); > > So where and how can I add some logging to know what is the time spent > between each poll requests and how that time is divided among various steps > in this topology. > > Thanks > Sachin > >