The issue is that multiple consumers feed into all the data channels.
So they will all eventually block if any data channel becomes full.
The mirror maker on trunk is significantly different so this is not an
issue on trunk.

On Fri, May 22, 2015 at 12:37:01PM -0400, Rajasekar Elango wrote:
> Thanks for pointers Joel. Will look into SSLSocketChannel. Yes this was
> working fine before upgrade.
> 
> If its just one producer thread stuck on write, it might affect only one
> consumer thread/partition. But we found consuming stopped for all
> topic/partitions. Or Is it only single  data channel shared between all
> producer and consumer threads..?
> 
> Thanks,
> Raja.
> 
> 
> On Fri, May 22, 2015 at 12:12 PM, Joel Koshy <jjkosh...@gmail.com> wrote:
> 
> > The threaddump suggests that one of the producers
> > (mirrormaker-producer-6) is blocked on write for some reason. So the
> > data-channel for that producer (which sits between the consumers and
> > the producer) is full which blocks the consumers from progressing.
> >
> > This appears to be in your (custom) SSLSocketChannel code. If you take
> > consecutive threaddumps I'm guessing you would see the same trace. If
> > this is reproducible can you do that? You can also hook up jvisualvm
> > or yourkit to see which threads are active and it may well be that
> > producer in a tight loop on the writeCompletely. Just to confirm you
> > did not see this issue before upgrading?
> >
> > Joel
> >
> > On Fri, May 22, 2015 at 11:35:19AM -0400, Rajasekar Elango wrote:
> > > We recently upgraded to kafka 0.8.2.1 and found issues with mirrormaker
> > > that randomly stops consuming. We had to restart the mirrormaker process
> > to
> > > resolve the problem. This problem has occurred several times in past two
> > > weeks.
> > >
> > > Here is what I found in analysis:
> > >
> > > When this problem happens:
> > >
> > > Mirrormaker log stopped rolling (ie nothing in logs) . Last couple of
> > > messages in mirrormaker log are ProducerSendThread producing to
> > > destination. No errors or exceptions.
> > >
> > > Mirrormaker consumer offset doesn't increase. ConsumerOffsetChecker shows
> > > mirrormaker consumer offset stops incrementing.
> > >
> > > Mirrormaker consumer MinFetch rate jmx metric drops to zero.
> > > ConsumerTopicMetric.BytesPerSec drops to zero.
> > >
> > > So its mirrormaker consumer should have stopped accepting new data.
> > >
> > > Can some one provide input on how to trouble shoot this problem further
> > and
> > > identify root cause?
> > >
> > > Got Thread dump before restarting, it looks ok to me, no blocked thread.
> > > Here is thread dump output
> > >
> > > 2015-05-21 18:59:09
> > > Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.76-b04 mixed
> > mode):
> > >
> > > "Attach Listener" daemon prio=10 tid=0x00007f7248002000 nid=0x2d53
> > waiting
> > > on condition [0x0000000000000000]
> > >    java.lang.Thread.State: RUNNABLE
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > >
> > "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-2"
> > > prio=10 tid=0x00007f71e407e000 nid=0x3425 waiting on condition
> > > [0x00007f72833f2000]
> > >    java.lang.Thread.State: WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042cd15cc8> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > >     at
> > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > >     at
> > >
> > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
> > >     at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > >     at
> > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >     at
> > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> > >     at kafka.utils.Utils$.inLock(Utils.scala:535)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:109)
> > >     at
> > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:87)
> > >     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > >
> > >    Locked ownable synchronizers:
> > >     - <0x000000042ea62eb0> (a
> > > java.util.concurrent.locks.ReentrantLock$NonfairSync)
> > >
> > >
> > "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-3"
> > > prio=10 tid=0x00007f71e407b000 nid=0x3424 waiting on condition
> > > [0x00007f7281f99000]
> > >    java.lang.Thread.State: WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ccece80> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > >     at
> > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > >     at
> > >
> > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
> > >     at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > >     at
> > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >     at
> > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> > >     at kafka.utils.Utils$.inLock(Utils.scala:535)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:109)
> > >     at
> > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:87)
> > >     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > >
> > >    Locked ownable synchronizers:
> > >     - <0x000000042e430f30> (a
> > > java.util.concurrent.locks.ReentrantLock$NonfairSync)
> > >
> > >
> > "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-5"
> > > prio=10 tid=0x00007f71e405f800 nid=0x3423 waiting on condition
> > > [0x00007f72832f1000]
> > >    java.lang.Thread.State: WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ceb4648> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > >     at
> > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > >     at
> > >
> > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
> > >     at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > >     at
> > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >     at
> > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> > >     at kafka.utils.Utils$.inLock(Utils.scala:535)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:109)
> > >     at
> > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:87)
> > >     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > >
> > >    Locked ownable synchronizers:
> > >     - <0x000000042fc4c5c8> (a
> > > java.util.concurrent.locks.ReentrantLock$NonfairSync)
> > >
> > >
> > "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-1"
> > > prio=10 tid=0x00007f71e4033000 nid=0x3422 waiting on condition
> > > [0x00007f728209a000]
> > >    java.lang.Thread.State: WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042c9c7ca0> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > >     at
> > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > >     at
> > >
> > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
> > >     at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > >     at
> > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >     at
> > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> > >     at kafka.utils.Utils$.inLock(Utils.scala:535)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:109)
> > >     at
> > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:87)
> > >     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > >
> > >    Locked ownable synchronizers:
> > >     - <0x000000043ac68888> (a
> > > java.util.concurrent.locks.ReentrantLock$NonfairSync)
> > >
> > >
> > "ConsumerFetcherThread-mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-0-4"
> > > prio=10 tid=0x00007f71e4032000 nid=0x3421 waiting on condition
> > > [0x00007f72823e2000]
> > >    java.lang.Thread.State: WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ccece80> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > >     at
> > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > >     at
> > >
> > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:129)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:110)
> > >     at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > >     at
> > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >     at
> > > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:110)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:110)
> > >     at kafka.utils.Utils$.inLock(Utils.scala:535)
> > >     at
> > >
> > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:109)
> > >     at
> > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:87)
> > >     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > >
> > >    Locked ownable synchronizers:
> > >     - <0x00000004587412c8> (a
> > > java.util.concurrent.locks.ReentrantLock$NonfairSync)
> > >
> > >
> > "mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395-leader-finder-thread"
> > > prio=10 tid=0x00007f721800e000 nid=0x3420 waiting on condition
> > > [0x00007f72831f0000]
> > >    java.lang.Thread.State: WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042cd15c38> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > >     at
> > >
> > kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:61)
> > >     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "mirrormaker-producer-7" prio=10 tid=0x00007f730486d800 nid=0x5ce3
> > runnable
> > > [0x00007f72825e4000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ce52700> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > > java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:389)
> > >     at kafka.tools.MirrorMaker$DataChannel.take(MirrorMaker.scala:236)
> > >     at kafka.tools.MirrorMaker$ProducerThread.run(MirrorMaker.scala:303)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "mirrormaker-producer-6" prio=10 tid=0x00007f730486c000 nid=0x5ce2
> > waiting
> > > on condition [0x00007f72826e5000]
> > >    java.lang.Thread.State: WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042c9c2e48> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > >     at
> > kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:99)
> > >     at
> > kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:91)
> > >     at
> > >
> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> > >     at
> > scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> > >     at kafka.producer.Producer.asyncSend(Producer.scala:91)
> > >     at kafka.producer.Producer.send(Producer.scala:78)
> > >     - locked <0x000000042c9c2e90> (a java.lang.Object)
> > >     at kafka.producer.OldProducer.send(BaseProducer.scala:62)
> > >     at kafka.tools.MirrorMaker$ProducerThread.run(MirrorMaker.scala:309)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "mirrormaker-producer-5" prio=10 tid=0x00007f730486a800 nid=0x5ce1
> > runnable
> > > [0x00007f72827e6000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ce06458> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > > java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:389)
> > >     at kafka.tools.MirrorMaker$DataChannel.take(MirrorMaker.scala:236)
> > >     at kafka.tools.MirrorMaker$ProducerThread.run(MirrorMaker.scala:303)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "mirrormaker-producer-4" prio=10 tid=0x00007f7304855800 nid=0x5ce0
> > waiting
> > > on condition [0x00007f72828e7000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042cceceb0> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > > java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:389)
> > >     at kafka.tools.MirrorMaker$DataChannel.take(MirrorMaker.scala:236)
> > >     at kafka.tools.MirrorMaker$ProducerThread.run(MirrorMaker.scala:303)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "mirrormaker-producer-3" prio=10 tid=0x00007f7304853800 nid=0x5cdf
> > waiting
> > > on condition [0x00007f72829e8000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ce01030> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > > java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:389)
> > >     at kafka.tools.MirrorMaker$DataChannel.take(MirrorMaker.scala:236)
> > >     at kafka.tools.MirrorMaker$ProducerThread.run(MirrorMaker.scala:303)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "mirrormaker-producer-2" prio=10 tid=0x00007f7304852000 nid=0x5cde
> > runnable
> > > [0x00007f7282ae9000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ce8f3f0> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > > java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:389)
> > >     at kafka.tools.MirrorMaker$DataChannel.take(MirrorMaker.scala:236)
> > >     at kafka.tools.MirrorMaker$ProducerThread.run(MirrorMaker.scala:303)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "mirrormaker-producer-1" prio=10 tid=0x00007f7304850800 nid=0x5cdd
> > waiting
> > > on condition [0x00007f7282bea000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ce010f0> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > > java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:389)
> > >     at kafka.tools.MirrorMaker$DataChannel.take(MirrorMaker.scala:236)
> > >     at kafka.tools.MirrorMaker$ProducerThread.run(MirrorMaker.scala:303)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "mirrormaker-producer-0" prio=10 tid=0x00007f730484e800 nid=0x5cdc
> > waiting
> > > on condition [0x00007f7282ceb000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ce76540> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > > java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:389)
> > >     at kafka.tools.MirrorMaker$DataChannel.take(MirrorMaker.scala:236)
> > >     at kafka.tools.MirrorMaker$ProducerThread.run(MirrorMaker.scala:303)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "mirrormaker-consumer-3" prio=10 tid=0x00007f73047f2800 nid=0x5cdb
> > waiting
> > > on condition [0x00007f7282dec000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ceb1620> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:350)
> > >     at kafka.tools.MirrorMaker$DataChannel.put(MirrorMaker.scala:225)
> > >     at kafka.tools.MirrorMaker$DataChannel.put(MirrorMaker.scala:216)
> > >     at
> > >
> > kafka.tools.MirrorMaker$ConsumerThread$$anonfun$run$2.apply(MirrorMaker.scala:261)
> > >     at
> > >
> > kafka.tools.MirrorMaker$ConsumerThread$$anonfun$run$2.apply(MirrorMaker.scala:259)
> > >     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > >     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
> > >     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > >     at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
> > >     at kafka.tools.MirrorMaker$ConsumerThread.run(MirrorMaker.scala:259)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "mirrormaker-consumer-2" prio=10 tid=0x00007f73047f0800 nid=0x5cda
> > runnable
> > > [0x00007f7282eed000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ceb1620> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:350)
> > >     at kafka.tools.MirrorMaker$DataChannel.put(MirrorMaker.scala:225)
> > >     at kafka.tools.MirrorMaker$DataChannel.put(MirrorMaker.scala:216)
> > >     at
> > >
> > kafka.tools.MirrorMaker$ConsumerThread$$anonfun$run$2.apply(MirrorMaker.scala:261)
> > >     at
> > >
> > kafka.tools.MirrorMaker$ConsumerThread$$anonfun$run$2.apply(MirrorMaker.scala:259)
> > >     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > >     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
> > >     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > >     at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
> > >     at kafka.tools.MirrorMaker$ConsumerThread.run(MirrorMaker.scala:259)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "mirrormaker-consumer-1" prio=10 tid=0x00007f73047ef000 nid=0x5cd9
> > runnable
> > > [0x00007f7282fee000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ceb1620> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:350)
> > >     at kafka.tools.MirrorMaker$DataChannel.put(MirrorMaker.scala:225)
> > >     at kafka.tools.MirrorMaker$DataChannel.put(MirrorMaker.scala:216)
> > >     at
> > >
> > kafka.tools.MirrorMaker$ConsumerThread$$anonfun$run$2.apply(MirrorMaker.scala:261)
> > >     at
> > >
> > kafka.tools.MirrorMaker$ConsumerThread$$anonfun$run$2.apply(MirrorMaker.scala:259)
> > >     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > >     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
> > >     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > >     at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
> > >     at kafka.tools.MirrorMaker$ConsumerThread.run(MirrorMaker.scala:259)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "mirrormaker-consumer-0" prio=10 tid=0x00007f73047ee800 nid=0x5cd8
> > waiting
> > > on condition [0x00007f72830ef000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ceb1620> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:350)
> > >     at kafka.tools.MirrorMaker$DataChannel.put(MirrorMaker.scala:225)
> > >     at kafka.tools.MirrorMaker$DataChannel.put(MirrorMaker.scala:216)
> > >     at
> > >
> > kafka.tools.MirrorMaker$ConsumerThread$$anonfun$run$2.apply(MirrorMaker.scala:261)
> > >     at
> > >
> > kafka.tools.MirrorMaker$ConsumerThread$$anonfun$run$2.apply(MirrorMaker.scala:259)
> > >     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > >     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
> > >     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > >     at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
> > >     at kafka.tools.MirrorMaker$ConsumerThread.run(MirrorMaker.scala:259)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > >
> > "mirrormakerProd_ops-mmrs1-2-tyo.ops.sfdc.net-1431458688650-fb15f395_watcher_executor"
> > > prio=10 tid=0x00007f7304b63800 nid=0x5be8 waiting on condition
> > > [0x00007f72834f3000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042c9d2e08> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176)
> > >     at
> > >
> > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:544)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "ProducerSendThread--7" prio=10 tid=0x00007f7304b22800 nid=0x5be7 waiting
> > > on condition [0x00007f72835f4000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ce59750> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at scala.collection.immutable.Stream$.continually(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000407ae10a0> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000407ae10d0> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at scala.collection.immutable.Stream.foreach(Stream.scala:548)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > >     at
> > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "ProducerSendThread--6" prio=10 tid=0x00007f7304b21000 nid=0x5be6
> > runnable
> > > [0x00007f72836f4000]
> > >    java.lang.Thread.State: RUNNABLE
> > >     at
> > > kafka.network.security.SSLSocketChannel.wrap(SSLSocketChannel.scala:542)
> > >     at
> > > kafka.network.security.SSLSocketChannel.write(SSLSocketChannel.scala:267)
> > >     - locked <0x000000042ca2bb90> (a
> > > kafka.network.security.SSLSocketChannel)
> > >     at
> > >
> > kafka.network.security.SSLSocketChannel.localWriteLoop$1(SSLSocketChannel.scala:281)
> > >     at
> > > kafka.network.security.SSLSocketChannel.write(SSLSocketChannel.scala:294)
> > >     at java.nio.channels.SocketChannel.write(SocketChannel.java:493)
> > >     at
> > >
> > kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56)
> > >     at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
> > >     at
> > >
> > kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26)
> > >     at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
> > >     at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:76)
> > >     at
> > >
> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:75)
> > >     - locked <0x000000044d97d368> (a java.lang.Object)
> > >     at
> > >
> > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:106)
> > >     at
> > >
> > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:106)
> > >     at
> > >
> > kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:106)
> > >     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >     at
> > >
> > kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:105)
> > >     at
> > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:105)
> > >     at
> > > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:105)
> > >     at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > >     at kafka.producer.SyncProducer.send(SyncProducer.scala:104)
> > >     at
> > >
> > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
> > >     at
> > >
> > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
> > >     at
> > >
> > kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
> > >     at
> > >
> > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> > >     at
> > >
> > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> > >     at
> > >
> > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> > >     at
> > >
> > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> > >     at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> > >     at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> > >     at
> > >
> > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> > >     at
> > >
> > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
> > >     at
> > >
> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > >     at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > >     at
> > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "ProducerSendThread--5" prio=10 tid=0x00007f7304b1e800 nid=0x5be5 waiting
> > > on condition [0x00007f72837f6000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042cd753a8> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at scala.collection.immutable.Stream$.continually(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000407ae00c0> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000407ae00f0> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at scala.collection.immutable.Stream.foreach(Stream.scala:548)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > >     at
> > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "ProducerSendThread--4" prio=10 tid=0x00007f7304b1d000 nid=0x5be4 waiting
> > > on condition [0x00007f72838f7000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042cdaad98> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at scala.collection.immutable.Stream$.continually(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000459a9bb30> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000459a9bb60> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at scala.collection.immutable.Stream.foreach(Stream.scala:548)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > >     at
> > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "ProducerSendThread--3" prio=10 tid=0x00007f7304b1b800 nid=0x5be3 waiting
> > > on condition [0x00007f72839f8000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ce8db28> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at scala.collection.immutable.Stream$.continually(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000407ae1108> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000407ae1138> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at scala.collection.immutable.Stream.foreach(Stream.scala:548)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > >     at
> > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "ProducerSendThread--2" prio=10 tid=0x00007f7304b1a000 nid=0x5be2 waiting
> > > on condition [0x00007f7283af9000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ce90920> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at scala.collection.immutable.Stream$.continually(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000407ae1850> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000407ae1880> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at scala.collection.immutable.Stream.foreach(Stream.scala:548)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > >     at
> > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "ProducerSendThread--1" prio=10 tid=0x00007f7304b00000 nid=0x5be1 waiting
> > > on condition [0x00007f7283bfa000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042cd234a8> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at scala.collection.immutable.Stream$.continually(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000407ae1170> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000407ae11a0> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at scala.collection.immutable.Stream.foreach(Stream.scala:548)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > >     at
> > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "ProducerSendThread--0" prio=10 tid=0x00007f7304b15000 nid=0x5be0 waiting
> > > on condition [0x00007f7283cfb000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042ced4a88> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66)
> > >     at scala.collection.immutable.Stream$.continually(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1129)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000407ae0128> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at
> > >
> > scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:803)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> > >     - locked <0x0000000407ae0158> (a
> > scala.collection.immutable.Stream$Cons)
> > >     at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> > >     at scala.collection.immutable.Stream.foreach(Stream.scala:548)
> > >     at
> > >
> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > >     at
> > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "kafka-consumer-scheduler-0" daemon prio=10 tid=0x00007f7304acb000
> > > nid=0x5bdf waiting on condition [0x00007f7283dfc000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042c9d39e8> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
> > >     at
> > >
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > >     at java.lang.Thread.run(Thread.java:745)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "main-EventThread" daemon prio=10 tid=0x00007f7304ae2800 nid=0x5bde
> > waiting
> > > on condition [0x00007f7283efd000]
> > >    java.lang.Thread.State: WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042c9dc178> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> > >     at
> > org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:494)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "main-SendThread(10.238.12.154:2181)" daemon prio=10
> > tid=0x00007f7304ae1000
> > > nid=0x5bdd runnable [0x00007f72881b5000]
> > >    java.lang.Thread.State: RUNNABLE
> > >     at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > >     at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> > >     at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
> > >     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
> > >     - locked <0x000000042c9dcb78> (a sun.nio.ch.Util$2)
> > >     - locked <0x000000042c9dcb90> (a
> > java.util.Collections$UnmodifiableSet)
> > >     - locked <0x000000042c9db640> (a sun.nio.ch.EPollSelectorImpl)
> > >     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
> > >     at
> > >
> > org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:349)
> > >     at
> > org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "ZkClient-EventThread-19-mandm-zookeeper-tyo.data.sfdc.net:2181" daemon
> > > prio=10 tid=0x00007f7304ab5800 nid=0x5bdc waiting on condition
> > > [0x00007f72882b6000]
> > >    java.lang.Thread.State: WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042c9dcef8> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > >     at
> > >
> > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> > >     at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:67)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "metrics-meter-tick-thread-2" daemon prio=10 tid=0x00007f7304aa5800
> > > nid=0x5bdb waiting on condition [0x00007f72883b7000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042c9d3a18> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
> > >     at
> > >
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > >     at java.lang.Thread.run(Thread.java:745)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "metrics-meter-tick-thread-1" daemon prio=10 tid=0x00007f7304a9f800
> > > nid=0x5bda waiting on condition [0x00007f72884b8000]
> > >    java.lang.Thread.State: WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042c9d3a18> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > >     at
> > >
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1085)
> > >     at
> > >
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > >     at java.lang.Thread.run(Thread.java:745)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "RMI TCP Accept-0" daemon prio=10 tid=0x00007f7304881800 nid=0x5bd8
> > > runnable [0x00007f72886ba000]
> > >    java.lang.Thread.State: RUNNABLE
> > >     at java.net.PlainSocketImpl.socketAccept(Native Method)
> > >     at
> > > java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
> > >     at java.net.ServerSocket.implAccept(ServerSocket.java:530)
> > >     at java.net.ServerSocket.accept(ServerSocket.java:498)
> > >     at
> > >
> > sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:52)
> > >     at
> > >
> > sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:399)
> > >     at
> > > sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:371)
> > >     at java.lang.Thread.run(Thread.java:745)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "RMI TCP Accept-6665" daemon prio=10 tid=0x00007f7304866800 nid=0x5bd7
> > > runnable [0x00007f72887bb000]
> > >    java.lang.Thread.State: RUNNABLE
> > >     at java.net.PlainSocketImpl.socketAccept(Native Method)
> > >     at
> > > java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
> > >     at java.net.ServerSocket.implAccept(ServerSocket.java:530)
> > >     at java.net.ServerSocket.accept(ServerSocket.java:498)
> > >     at
> > >
> > sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:399)
> > >     at
> > > sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:371)
> > >     at java.lang.Thread.run(Thread.java:745)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "RMI TCP Accept-0" daemon prio=10 tid=0x00007f7304858000 nid=0x5bd6
> > > runnable [0x00007f72888bc000]
> > >    java.lang.Thread.State: RUNNABLE
> > >     at java.net.PlainSocketImpl.socketAccept(Native Method)
> > >     at
> > > java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398)
> > >     at java.net.ServerSocket.implAccept(ServerSocket.java:530)
> > >     at java.net.ServerSocket.accept(ServerSocket.java:498)
> > >     at
> > >
> > sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:399)
> > >     at
> > > sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:371)
> > >     at java.lang.Thread.run(Thread.java:745)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "Service Thread" daemon prio=10 tid=0x00007f73047ea000 nid=0x5bd5
> > runnable
> > > [0x0000000000000000]
> > >    java.lang.Thread.State: RUNNABLE
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "C2 CompilerThread1" daemon prio=10 tid=0x00007f73047e7800 nid=0x5bd4
> > > waiting on condition [0x0000000000000000]
> > >    java.lang.Thread.State: RUNNABLE
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "C2 CompilerThread0" daemon prio=10 tid=0x00007f73047e5800 nid=0x5bd3
> > > waiting on condition [0x0000000000000000]
> > >    java.lang.Thread.State: RUNNABLE
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "jmxtrans-agent-1" daemon prio=10 tid=0x00007f73047e3000 nid=0x5bd2
> > waiting
> > > on condition [0x00007f7288cc0000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042c9dc1d8> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > >
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
> > >     at
> > >
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > >     at java.lang.Thread.run(Thread.java:745)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "pool-1-thread-1" prio=10 tid=0x00007f7304795000 nid=0x5bcf waiting on
> > > condition [0x00007f7288dc1000]
> > >    java.lang.Thread.State: TIMED_WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042c9d3a78> (a
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > >     at
> > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> > >     at
> > > java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:389)
> > >     at
> > >
> > com.salesforce.ajna.funnel.client.FunnelAsyncPublisherRunnable.collectMessages(FunnelAsyncPublisherRunnable.java:179)
> > >     at
> > >
> > com.salesforce.ajna.funnel.client.FunnelAsyncPublisherRunnable.run(FunnelAsyncPublisherRunnable.java:74)
> > >     at
> > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> > >     at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> > >     at
> > >
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> > >     at java.lang.Thread.run(Thread.java:745)
> > >
> > >    Locked ownable synchronizers:
> > >     - <0x000000042c9dc388> (a
> > > java.util.concurrent.ThreadPoolExecutor$Worker)
> > >
> > > "Signal Dispatcher" daemon prio=10 tid=0x00007f730456a800 nid=0x5bce
> > > runnable [0x0000000000000000]
> > >    java.lang.Thread.State: RUNNABLE
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "Surrogate Locker Thread (Concurrent GC)" daemon prio=10
> > > tid=0x00007f7304568800 nid=0x5bcd waiting on condition
> > [0x0000000000000000]
> > >    java.lang.Thread.State: RUNNABLE
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "Finalizer" daemon prio=10 tid=0x00007f7304540000 nid=0x5bcc in
> > > Object.wait() [0x00007f7289b57000]
> > >    java.lang.Thread.State: WAITING (on object monitor)
> > >     at java.lang.Object.wait(Native Method)
> > >     - waiting on <0x000000042c9c7ce8> (a
> > java.lang.ref.ReferenceQueue$Lock)
> > >     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
> > >     - locked <0x000000042c9c7ce8> (a java.lang.ref.ReferenceQueue$Lock)
> > >     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)
> > >     at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "Reference Handler" daemon prio=10 tid=0x00007f730453e000 nid=0x5bcb in
> > > Object.wait() [0x00007f7289c58000]
> > >    java.lang.Thread.State: WAITING (on object monitor)
> > >     at java.lang.Object.wait(Native Method)
> > >     - waiting on <0x000000042c9dcba8> (a java.lang.ref.Reference$Lock)
> > >     at java.lang.Object.wait(Object.java:503)
> > >     at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
> > >     - locked <0x000000042c9dcba8> (a java.lang.ref.Reference$Lock)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "main" prio=10 tid=0x00007f730400f800 nid=0x5bab waiting on condition
> > > [0x00007f7309031000]
> > >    java.lang.Thread.State: WAITING (parking)
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     - parking to wait for  <0x000000042c9e3c18> (a
> > > java.util.concurrent.CountDownLatch$Sync)
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
> > >     at
> > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
> > >     at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
> > >     at
> > >
> > kafka.tools.MirrorMaker$ProducerThread.awaitShutdown(MirrorMaker.scala:341)
> > >     at
> > kafka.tools.MirrorMaker$$anonfun$main$12.apply(MirrorMaker.scala:176)
> > >     at
> > kafka.tools.MirrorMaker$$anonfun$main$12.apply(MirrorMaker.scala:176)
> > >     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > >     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > >     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > >     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > >     at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:176)
> > >     at kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> > >
> > >    Locked ownable synchronizers:
> > >     - None
> > >
> > > "VM Thread" prio=10 tid=0x00007f7304539800 nid=0x5bca runnable
> > >
> > > "Gang worker#0 (Parallel GC Threads)" prio=10 tid=0x00007f7304021000
> > > nid=0x5bac runnable
> > >
> > > "Gang worker#1 (Parallel GC Threads)" prio=10 tid=0x00007f7304023000
> > > nid=0x5bad runnable
> > >
> > > "Gang worker#2 (Parallel GC Threads)" prio=10 tid=0x00007f7304024800
> > > nid=0x5bae runnable
> > >
> > > "Gang worker#3 (Parallel GC Threads)" prio=10 tid=0x00007f7304026800
> > > nid=0x5baf runnable
> > >
> > > "Gang worker#4 (Parallel GC Threads)" prio=10 tid=0x00007f7304028800
> > > nid=0x5bb0 runnable
> > >
> > > "Gang worker#5 (Parallel GC Threads)" prio=10 tid=0x00007f730402a000
> > > nid=0x5bb1 runnable
> > >
> > > "Gang worker#6 (Parallel GC Threads)" prio=10 tid=0x00007f730402c000
> > > nid=0x5bb2 runnable
> > >
> > > "Gang worker#7 (Parallel GC Threads)" prio=10 tid=0x00007f730402e000
> > > nid=0x5bb3 runnable
> > >
> > > "Gang worker#8 (Parallel GC Threads)" prio=10 tid=0x00007f7304030000
> > > nid=0x5bb4 runnable
> > >
> > > "Gang worker#9 (Parallel GC Threads)" prio=10 tid=0x00007f7304031800
> > > nid=0x5bb5 runnable
> > >
> > > "Gang worker#10 (Parallel GC Threads)" prio=10 tid=0x00007f7304033800
> > > nid=0x5bb6 runnable
> > >
> > > "Gang worker#11 (Parallel GC Threads)" prio=10 tid=0x00007f7304035800
> > > nid=0x5bb7 runnable
> > >
> > > "Gang worker#12 (Parallel GC Threads)" prio=10 tid=0x00007f7304037000
> > > nid=0x5bb8 runnable
> > >
> > > "Gang worker#13 (Parallel GC Threads)" prio=10 tid=0x00007f7304039000
> > > nid=0x5bb9 runnable
> > >
> > > "Gang worker#14 (Parallel GC Threads)" prio=10 tid=0x00007f730403b000
> > > nid=0x5bba runnable
> > >
> > > "Gang worker#15 (Parallel GC Threads)" prio=10 tid=0x00007f730403d000
> > > nid=0x5bbb runnable
> > >
> > > "Gang worker#16 (Parallel GC Threads)" prio=10 tid=0x00007f730403e800
> > > nid=0x5bbc runnable
> > >
> > > "Gang worker#17 (Parallel GC Threads)" prio=10 tid=0x00007f7304040800
> > > nid=0x5bbd runnable
> > >
> > > "Gang worker#18 (Parallel GC Threads)" prio=10 tid=0x00007f7304042800
> > > nid=0x5bbe runnable
> > >
> > > "Gang worker#19 (Parallel GC Threads)" prio=10 tid=0x00007f7304044000
> > > nid=0x5bbf runnable
> > >
> > > "Gang worker#20 (Parallel GC Threads)" prio=10 tid=0x00007f7304046000
> > > nid=0x5bc0 runnable
> > >
> > > "Gang worker#21 (Parallel GC Threads)" prio=10 tid=0x00007f7304048000
> > > nid=0x5bc1 runnable
> > >
> > > "Gang worker#22 (Parallel GC Threads)" prio=10 tid=0x00007f7304049800
> > > nid=0x5bc2 runnable
> > >
> > > "Concurrent Mark-Sweep GC Thread" prio=10 tid=0x00007f7304269000
> > nid=0x5bc9
> > > runnable
> > > "Gang worker#0 (Parallel CMS Threads)" prio=10 tid=0x00007f730425d000
> > > nid=0x5bc3 runnable
> > >
> > > "Gang worker#1 (Parallel CMS Threads)" prio=10 tid=0x00007f730425e800
> > > nid=0x5bc4 runnable
> > >
> > > "Gang worker#2 (Parallel CMS Threads)" prio=10 tid=0x00007f7304260800
> > > nid=0x5bc5 runnable
> > >
> > > "Gang worker#3 (Parallel CMS Threads)" prio=10 tid=0x00007f7304262800
> > > nid=0x5bc6 runnable
> > >
> > > "Gang worker#4 (Parallel CMS Threads)" prio=10 tid=0x00007f7304264800
> > > nid=0x5bc7 runnable
> > >
> > > "Gang worker#5 (Parallel CMS Threads)" prio=10 tid=0x00007f7304266000
> > > nid=0x5bc8 runnable
> > >
> > > "VM Periodic Task Thread" prio=10 tid=0x00007f7304883000 nid=0x5bd9
> > waiting
> > > on condition
> > >
> > > JNI global references: 663
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Raja.
> >
> > --
> > Joel
> >
> 
> 
> 
> -- 
> Thanks,
> Raja.

Reply via email to