Note on few things.
Set changelog topic delete retention time to as less as possible if the
previous values for same key are not needed and can be safely cleaned up.
Set segment size and segment retention time also low so older segments can
be compacted and cleaned up.
Set delete ratio to be aggressive 0.01 so segments don't grow to big.

This way state stores would be created much faster.

Also when using Windows smaller window size helps.

Try not running many stream threads on single machine unless you have a
great hardware.

Make sure a thread is not reading from many partitions. Make sure ratio of
partions to total threads is low.

Hope this helps.

Sachin

On 6 May 2017 13:28, "Shimi Kiviti" <shim...@gmail.com> wrote:

> This is very similar to issues that we see.
>
> Did you check the status of the consumer group? In my case it will be in
> rebalancing most of the time. Once in a while it will show consumers and
> offsets but after a short time will go back to rebalancing.
>
> How much storage does your Kafka-streams use?
> Also, what is your k8s configuration?
> Deployment? Deployment with emptyDir, hostPath or EBS? Statefulset?
>
> Thanks,
> Shimi
> On Sat, 6 May 2017 at 2:34 João Peixoto <joao.harti...@gmail.com> wrote:
>
> > After a while the instance started running.
> >
> > 2017-05-05 22:40:26.806  INFO 85 --- [ StreamThread-4]
> > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > Committing task StreamTask 1_62
> > (this is literally the next message)
> > 2017-05-05 23:13:27.820  INFO 85 --- [ StreamThread-4]
> > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > Committing all tasks because the commit interval 10000ms has elapsed
> >
> > On Fri, May 5, 2017 at 3:48 PM João Peixoto <joao.harti...@gmail.com>
> > wrote:
> >
> > > Warning, long message
> > >
> > > *Problem*: Initializing a Kafka Stream is taking a loooong time.
> > > Currently at the 40 minute mark
> > >
> > > *Setup*:
> > > 2 co-partition topics with 100 partitions.
> > > First topic contains a lot of messages in the order of hundreds of
> > millions
> > > Second topic is a KTable and contains ~30k records
> > >
> > > Kafka cluster with 6 brokers running 0.10.1
> > >
> > > Kafka streams running on 0.10.2.1. 5 instances with 5 threads each.
> > > The instances are running on Kubernetes
> > >
> > > *Stream Configuration*:
> > > Properties props = new Properties();
> > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName);
> > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
> > > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass().getName());
> > > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.ByteArray().getClass().getName());
> > > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
> > > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
> > > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);
> > > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> > > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
> > >
> > > *The events*:
> > > I started 5 instances of my stream configuration at the same time. This
> > is
> > > the first
> > > time this configuration is running.
> > >
> > > 2017-05-05 21:23:03.283  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > Creating producer client
> > > 2017-05-05 21:23:03.415  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > Creating consumer client
> > > 2017-05-05 21:23:03.520  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > Creating restore consumer client
> > > 2017-05-05 21:23:03.528  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > State transition from NOT_RUNNING to RUNNING.
> > > 2017-05-05 21:23:03.531  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-2]
> > > Creating producer client
> > > 2017-05-05 21:23:03.564  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-2]
> > > Creating consumer client
> > > 2017-05-05 21:23:03.569  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-2]
> > > Creating restore consumer client
> > > 2017-05-05 21:23:03.615  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-2]
> > > State transition from NOT_RUNNING to RUNNING.
> > > 2017-05-05 21:23:03.617  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-3]
> > > Creating producer client
> > > 2017-05-05 21:23:03.621  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-3]
> > > Creating consumer client
> > > 2017-05-05 21:23:03.625  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-3]
> > > Creating restore consumer client
> > > 2017-05-05 21:23:03.628  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-3]
> > > State transition from NOT_RUNNING to RUNNING.
> > > 2017-05-05 21:23:03.629  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > Creating producer client
> > > 2017-05-05 21:23:03.632  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > Creating consumer client
> > > 2017-05-05 21:23:03.635  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > Creating restore consumer client
> > > 2017-05-05 21:23:03.638  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > State transition from NOT_RUNNING to RUNNING.
> > > 2017-05-05 21:23:03.639  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-5]
> > > Creating producer client
> > > 2017-05-05 21:23:03.641  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-5]
> > > Creating consumer client
> > > 2017-05-05 21:23:03.644  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-5]
> > > Creating restore consumer client
> > > 2017-05-05 21:23:03.647  INFO 71 --- [           main]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-5]
> > > State transition from NOT_RUNNING to RUNNING.
> > > 2017-05-05 21:23:03.790  INFO 71 --- [ StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > Starting
> > > 2017-05-05 21:23:03.791  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > Starting
> > > 2017-05-05 21:23:03.790  INFO 71 --- [ StreamThread-2]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-2]
> > > Starting
> > > 2017-05-05 21:23:03.791  INFO 71 --- [ StreamThread-3]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-3]
> > > Starting
> > > 2017-05-05 21:23:03.792  INFO 71 --- [ StreamThread-5]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-5]
> > > Starting
> > > 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > at state RUNNING: partitions [] revoked at the beginning of consumer
> > > rebalance.
> > > 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-2]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-2]
> > > at state RUNNING: partitions [] revoked at the beginning of consumer
> > > rebalance.
> > > 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > at state RUNNING: partitions [] revoked at the beginning of consumer
> > > rebalance.
> > > 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > State transition from RUNNING to PARTITIONS_REVOKED.
> > > 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-3]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-3]
> > > at state RUNNING: partitions [] revoked at the beginning of consumer
> > > rebalance.
> > > 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-2]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-2]
> > > State transition from RUNNING to PARTITIONS_REVOKED.
> > > 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-5]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-5]
> > > at state RUNNING: partitions [] revoked at the beginning of consumer
> > > rebalance.
> > > 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > State transition from RUNNING to PARTITIONS_REVOKED.
> > > 2017-05-05 21:23:03.968  INFO 71 --- [ StreamThread-3]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-3]
> > > State transition from RUNNING to PARTITIONS_REVOKED.
> > > 2017-05-05 21:23:03.968  INFO 71 --- [ StreamThread-5]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-5]
> > > State transition from RUNNING to PARTITIONS_REVOKED.
> > > 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-2]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-2]
> > > Updating suspended tasks to contain active tasks []
> > > 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > Updating suspended tasks to contain active tasks []
> > > 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-5]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-5]
> > > Updating suspended tasks to contain active tasks []
> > > 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > Updating suspended tasks to contain active tasks []
> > > 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-3]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-3]
> > > Updating suspended tasks to contain active tasks []
> > > 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-2]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-2]
> > > Removing all active tasks []
> > > 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > Removing all active tasks []
> > > 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-5]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-5]
> > > Removing all active tasks []
> > > 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > Removing all active tasks []
> > > 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-3]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-3]
> > > Removing all active tasks []
> > > 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-2]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-2]
> > > Removing all standby tasks []
> > > 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > Removing all standby tasks []
> > > 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-5]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-5]
> > > Removing all standby tasks []
> > > 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > Removing all standby tasks []
> > > 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-3]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-3]
> > > Removing all standby tasks []
> > > 2017-05-05 21:23:04.020  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> [StreamThread-4]
> > > Constructed client metadata
> > > {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null,
> > > consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks:
> ([])
> > > prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost:
> 0.0]}}
> > > from the member subscriptions.
> > > 2017-05-05 21:23:04.218  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> [StreamThread-4]
> > > Completed validating internal topics in partition assignor
> > > 2017-05-05 21:23:04.591  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> [StreamThread-4]
> > > Completed validating internal topics in partition assignor
> > > 2017-05-05 21:23:04.726  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> [StreamThread-4]
> > > Assigned tasks to clients as
> > > {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>])
> > > assignedTasks: ([<list>]) prevActiveTasks: ([]) prevAssignedTasks: ([])
> > > capacity: 1.0 cost: 100.0]}.
> > > 2017-05-05 21:23:04.742  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> [StreamThread-4]
> > > Constructed client metadata
> > > {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null,
> > > consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks:
> ([])
> > > prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost:
> 0.0]}}
> > > from the member subscriptions.
> > > 2017-05-05 21:23:05.120  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> [StreamThread-4]
> > > Completed validating internal topics in partition assignor
> > > 2017-05-05 21:23:05.482  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> [StreamThread-4]
> > > Completed validating internal topics in partition assignor
> > > 2017-05-05 21:23:05.520  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> [StreamThread-4]
> > > Assigned tasks to clients as
> > > {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>])
> > > prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost:
> 50.0],
> > > da663a61-dada-478b-b060-78d77536530a=[activeTasks: ([<list>])
> > > prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost:
> 50.0]}.
> > > 2017-05-05 21:23:05.553  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > at state PARTITIONS_REVOKED: new partitions [<partitionlist>] assigned
> at
> > > the end of consumer rebalance.
> > > *// The above line is repeated for each thread*
> > > 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > > 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-5]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-5]
> > > State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > > 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-2]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-2]
> > > State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > > 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > > 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-3]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-3]
> > > State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > > *// omitted*
> > > 2017-05-05 21:23:15.596  INFO 71 --- [ StreamThread-2]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-2]
> > > State transition from ASSIGNING_PARTITIONS to RUNNING.
> > > *// above message repeated for each thread*
> > >
> > > *Important*: At this point only StreamThread-4 is performing commits
> > > every 10 seconds. The other threads output no logs. Now the fun begins
> > >
> > > 2017-05-05 21:29:21.310  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > State transition from RUNNING to PARTITIONS_REVOKED.
> > > 2017-05-05 21:29:21.310  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > Closing task's topology ... // repeated multiple times
> > > 2017-05-05 21:29:21.387  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > Flushing state stores of task ... // repeated multiple times
> > > 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > Committing consumer offsets of task ... // repeated multiple times
> > > 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > Updating suspended tasks to contain active tasks [<list>]
> > > 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > Removing all active tasks [<list>]
> > > 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-4]
> > > Removing all standby tasks []
> > >
> > >
> > > At this point there are no more log messages for 16 minutes!! During
> this
> > > time I perform several threaddumps, almost every minute.
> > > Thread dump below. Do notice that thread 4 is the only different one.
> > >
> > > "StreamThread-5" - Thread t@57
> > >    java.lang.Thread.State: BLOCKED
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:169)
> > >         - waiting to lock <653f9d6> (a
> > > org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:176)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$
> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
> ProcessorNode.java:139)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> StreamTask.java:268)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >         - locked <3fbbc5a8> (a java.util.PriorityQueue)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
> StreamTask.java:251)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.maybePunctuate(StreamThread.java:751)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:633)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > "StreamThread-4" - Thread t@55
> > >    java.lang.Thread.State: RUNNABLE
> > >         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > >         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> 269)
> > >         at
> > sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > >         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> java:86)
> > >         - locked <5709c085> (a sun.nio.ch.Util$2)
> > >         - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet)
> > >         - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl)
> > >         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > >         at
> > > org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > >         at
> > org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > >         at
> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:226)
> > >         - locked <639d53ee> (a
> > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:172)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:290)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1029)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:592)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > "StreamThread-3" - Thread t@53
> > >    java.lang.Thread.State: RUNNABLE
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:169)
> > >         - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor)
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:176)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$
> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
> ProcessorNode.java:139)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> StreamTask.java:268)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >         - locked <7a09baf0> (a java.util.PriorityQueue)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
> StreamTask.java:251)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.maybePunctuate(StreamThread.java:751)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:633)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > "StreamThread-2" - Thread t@51
> > >    java.lang.Thread.State: BLOCKED
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:169)
> > >         - waiting to lock <653f9d6> (a
> > > org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:176)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$
> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
> ProcessorNode.java:139)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> StreamTask.java:268)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >         - locked <2dc188ac> (a java.util.PriorityQueue)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
> StreamTask.java:251)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.maybePunctuate(StreamThread.java:751)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:633)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > "StreamThread-1" - Thread t@49
> > >    java.lang.Thread.State: RUNNABLE
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:172)
> > >         - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor)
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:176)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$
> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
> ProcessorNode.java:139)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> StreamTask.java:268)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >         - locked <7dffc3aa> (a java.util.PriorityQueue)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
> StreamTask.java:251)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.maybePunctuate(StreamThread.java:751)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:633)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > >
> > > (no logs omitted, ~16 minutes later)
> > > 2017-05-05 21:45:05.270  INFO 71 --- [ StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > Committing all tasks because the commit interval 10000ms has elapsed
> > > 2017-05-05 21:45:05.270  INFO 71 --- [ StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > Committing task StreamTask .. // repeated multiple times
> > > 2017-05-05 21:45:05.379  INFO 71 --- [ StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread         : stream-thread
> [StreamThread-1]
> > > State transition from RUNNING to PARTITIONS_REVOKED.
> > > *// The above is repeated for threads 2, 3 and 5*
> > > (no logs omitted!! This is really the next entry, ~10 minutes later)
> > > 2017-05-05 21:55:16.835  INFO 71 --- [ StreamThread-4]
> > > o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> [StreamThread-4]
> > > Constructed client metadata ...
> > >
> > > During the above 10 minutes all threads show the following
> > > "StreamThread-5" - Thread t@57
> > >    java.lang.Thread.State: RUNNABLE
> > >         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > >         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> 269)
> > >         at
> > sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > >         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> java:86)
> > >         - locked <4a60e0dd> (a sun.nio.ch.Util$2)
> > >         - locked <5e54059f> (a java.util.Collections$UnmodifiableSet)
> > >         - locked <60693986> (a sun.nio.ch.EPollSelectorImpl)
> > >         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > >         at
> > > org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > >         at
> > org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > >         at
> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:226)
> > >         - locked <5f1ae6c1> (a
> > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:172)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:290)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1029)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:592)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > "StreamThread-4" - Thread t@55
> > >    java.lang.Thread.State: RUNNABLE
> > >         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > >         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> 269)
> > >         at
> > sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > >         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> java:86)
> > >         - locked <5709c085> (a sun.nio.ch.Util$2)
> > >         - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet)
> > >         - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl)
> > >         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > >         at
> > > org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > >         at
> > org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > >         at
> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:226)
> > >         - locked <639d53ee> (a
> > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:172)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:290)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1029)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:592)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > "StreamThread-3" - Thread t@53
> > >    java.lang.Thread.State: RUNNABLE
> > >         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > >         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> 269)
> > >         at
> > sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > >         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> java:86)
> > >         - locked <123193f7> (a sun.nio.ch.Util$2)
> > >         - locked <6c3704d3> (a java.util.Collections$UnmodifiableSet)
> > >         - locked <45bbb5da> (a sun.nio.ch.EPollSelectorImpl)
> > >         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > >         at
> > > org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > >         at
> > org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > >         at
> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:226)
> > >         - locked <4d9f6f42> (a
> > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:172)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:290)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1029)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:592)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > "StreamThread-2" - Thread t@51
> > >    java.lang.Thread.State: RUNNABLE
> > >         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > >         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> 269)
> > >         at
> > sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > >         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> java:86)
> > >         - locked <532ff32d> (a sun.nio.ch.Util$2)
> > >         - locked <76a6407> (a java.util.Collections$UnmodifiableSet)
> > >         - locked <1f670455> (a sun.nio.ch.EPollSelectorImpl)
> > >         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > >         at
> > > org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > >         at
> > org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > >         at
> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:226)
> > >         - locked <29b48d84> (a
> > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:172)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:290)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1029)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:592)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > "StreamThread-1" - Thread t@49
> > >    java.lang.Thread.State: RUNNABLE
> > >         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > >         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> 269)
> > >         at
> > sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > >         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> java:86)
> > >         - locked <5aeb504> (a sun.nio.ch.Util$2)
> > >         - locked <5130a3ea> (a java.util.Collections$UnmodifiableSet)
> > >         - locked <76d25035> (a sun.nio.ch.EPollSelectorImpl)
> > >         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > >         at
> > > org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > >         at
> > org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > >         at
> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:226)
> > >         - locked <7b072bc6> (a
> > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
> ConsumerNetworkClient.java:172)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:290)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1029)
> > >         at
> > >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:592)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > Eventually they get assigned partitions again, then they are revoked,
> > > another long time passes, threads 1, 2, 3 and 5 stuck on Sensor and we
> > get
> > > into the same situation.
> > >
> > > Finally, I tried starting up only 1 instance (with 5 threads). Current
> > > status:
> > >
> > > "StreamThread-5" - Thread t@56
> > >    java.lang.Thread.State: RUNNABLE
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:169)
> > >         - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor)
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:176)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$
> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
> ProcessorNode.java:139)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> StreamTask.java:268)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >         - locked <5bea9b1b> (a java.util.PriorityQueue)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
> StreamTask.java:251)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.maybePunctuate(StreamThread.java:751)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:633)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > "StreamThread-4" - Thread t@54
> > >    java.lang.Thread.State: RUNNABLE
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:169)
> > >         - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor)
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:176)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$
> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
> ProcessorNode.java:139)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> StreamTask.java:268)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >         - locked <5a06855> (a java.util.PriorityQueue)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
> StreamTask.java:251)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.maybePunctuate(StreamThread.java:751)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:633)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > "StreamThread-3" - Thread t@52
> > >    java.lang.Thread.State: BLOCKED
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:169)
> > >         - waiting to lock <6a55b7c6> (a
> > > org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:176)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$
> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
> ProcessorNode.java:139)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> StreamTask.java:268)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >         - locked <d3a57bc> (a java.util.PriorityQueue)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
> StreamTask.java:251)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.maybePunctuate(StreamThread.java:751)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:633)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > "StreamThread-2" - Thread t@50
> > >    java.lang.Thread.State: RUNNABLE
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:175)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$
> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
> ProcessorNode.java:139)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> StreamTask.java:268)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >         - locked <480f4efc> (a java.util.PriorityQueue)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
> StreamTask.java:251)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.maybePunctuate(StreamThread.java:751)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:633)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > "StreamThread-1" - Thread t@48
> > >    java.lang.Thread.State: BLOCKED
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:169)
> > >         - waiting to lock <6a55b7c6> (a
> > > org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56
> > >         at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> java:176)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread$
> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
> ProcessorNode.java:139)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> StreamTask.java:268)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >         - locked <47b226a5> (a java.util.PriorityQueue)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
> StreamTask.java:251)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.maybePunctuate(StreamThread.java:751)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:633)
> > >         at
> > >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > >
> > >    Locked ownable synchronizers:
> > >         - None
> > >
> > > This has been going on for over 40 minutes now and the cluster does not
> > > stabilize. Not sure what to do here, any help welcome.
> > >
> > >
> >
>

Reply via email to