Hi,
I have debugged the problem further and looks like something is not right.

Here are the logs from one of the thread

stream-thread [StreamThread-3] Committing all tasks because the commit
interval 30000ms has elapsed
stream-thread [StreamThread-3] Committing task 0_0
stream-thread [StreamThread-3] Committing task 0_20
stream-thread [StreamThread-3] Committing task 0_6
stream-thread [StreamThread-3] Committing task 0_9
stream-thread [StreamThread-3] Committing task 0_26
Discovered coordinator 192.168.73.198:9092 (id: 2147483643 rack: null) for
group new-part.
Revoking previously assigned partitions [stream-9, stream-26, stream-0,
stream-20, stream-6] for group new-part
stream-thread [StreamThread-3] partitions [[stream-9, stream-26, stream-0,
stream-20, stream-6]] revoked at the beginning of consumer rebalance.
stream-thread [StreamThread-3] Committing consumer offsets of task 0_0
stream-thread [StreamThread-3] Failed while executing StreamTask 0_0 duet
to commit consumer offsets:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.
stream-thread [StreamThread-3] Removing all active tasks [[0_0, 0_20, 0_6,
0_9, 0_26]]
stream-thread [StreamThread-3] Removing all standby tasks [[]]
Setting newly assigned partitions [stream-9, stream-26, stream-0,
stream-20, stream-6] for group new-part
stream-thread [StreamThread-3] New partitions [[stream-9, stream-26,
stream-0, stream-20, stream-6]] assigned at the end of consumer rebalance.
(Re-)joining group new-part
stream-thread [StreamThread-3] Failed to create an active task 0_0:
org.apache.kafka.streams.errors.ProcessorStateException: Error opening
store key-table-201702060200 at location
/data/kafka-streams/new-part/0_0/key-table/key-table-201702060200
Caused by: org.rocksdb.RocksDBException: IO error: lock
/data/advice/kafka-streams/new-part-advice/0_0/key-table/key-table-201702060200/LOCK:
No locks available


So You can see a thread has 5 tasks. After committing the task it somehow
revokes those assigned partitions and then tries to commit the offsets.
Here we get the CommitFailedException.

Then it removes all the tasks and then gets newly assigned tasks which are
same as before.

Then it fails to create those tasks which is caused by No locks available.

So something does not add up. Why did it try to revoke partitions and then
why did it get those same partitions again. What I feel is that when
revoking the partitions it is not releasing lock on rocks db and when some
other thread tries to get that lock it fails.

Please let me know what could be going wrong.

Thanks
Sachin


On Mon, Feb 6, 2017 at 11:51 AM, Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi All,
> We some time get errors like this:
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time
> message processing. You can address this either by increasing the session
> timeout or by reducing the maximum size of batches returned in poll() with
> max.poll.records.
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> sendOffsetCommitRequest(ConsumerCoordinator.java:600)
> ~[kafka-clients-0.10.1.1.jar:na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> commitOffsetsSync(ConsumerCoordinator.java:498)
> ~[kafka-clients-0.10.1.1.jar:na]
>     at org.apache.kafka.clients.consumer.KafkaConsumer.
> commitSync(KafkaConsumer.java:1104) ~[kafka-clients-0.10.1.1.jar:na]
>     at org.apache.kafka.streams.processor.internals.
> StreamTask.commitOffsets(StreamTask.java:289)
> ~[kafka-streams-0.10.1.1.jar:na]
>
> And our streams application crash.
>
> Now this error is straightforward. We somehow need to look where our
> process is spending that much time. Then we can either fix the process time
> to stay within the defaults or change the defaults.
> Since we are running High-Level Streams DSL is there anyway to know where
> we would be spending that much time.
>
> Our topology is like this
> input
> .groupByKey()
> .aggregate(new Initializer<SortedSet<T>>() {
>     public SortedSet<T> apply() {
>         return new TreeSet<T>();
>     }
> }, new Aggregator<String, T, SortedSet<T>>() {
>     public SortedSet<T> apply(String key, T value, SortedSet<T> aggregate)
> {
>         aggregate.add(value);
>         return aggregate;
>     }
> }, TimeWindows.of(...).advanceBy(...).until(...), valueSerde, "key-table")
> .mapValues(new ValueMapper<SortedSet<T>, SortedSet<U>>() {
>     public SortedSet<U> apply(SortedSet<T> t) {
>         SortedSet<U> u = new TreeSet<U>();
>         ...
>         return u;
>     }
> })
> .foreach(new ForeachAction<Windowed<String>, SortedSet<U>>() {
>     public void apply(Windowed<String> key, SortedSet<U> nodes) {
>         ...
>     }
> });
>
> So where and how can I add some logging to know what is the time spent
> between each poll requests and how that time is divided among various steps
> in this topology.
>
> Thanks
> Sachin
>
>

Reply via email to