[ 
https://issues.apache.org/jira/browse/KAFKA-3238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-3238.
------------------------------
    Resolution: Auto Closed

Closing inactive issue.  old consumer client is no longer supported.  In newer 
releases, we can use mirror maker with Java consumer . Please open a JIRA if 
the issue still exists in newer releases.

> Deadlock Mirrormaker consumer not fetching any messages
> -------------------------------------------------------
>
>                 Key: KAFKA-3238
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3238
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.0
>            Reporter: Rekha Joshi
>            Priority: Critical
>
> Hi,
> We have been seeing consistent issue mirroring between our DataCenters 
> happening randomly.Below are the details.
> Thanks
> Rekha
> {code}
> Source: AWS (13 Brokers)
> Destination: OTHER-DC (20 Brokers)
> Topic: mirrortest (source: 260 partitions, destination: 200 partitions)
> Connectivity: AWS Direct Connect (max 6Gbps)
> Data details: Source is receiving 40,000 msg/sec, each message is around
> 5KB
> Mirroring
> ------------
> Node: is at our OTHER-DC (24 Cores, 48 GB Memory)
> JVM Details: 1.8.0_51, -Xmx8G -Xms8G -XX:PermSize=96m -XX:MaxPermSize=96m
> -XX:+UseG1GC -XX:MaxGCPauseMillis=20
> -XX:InitiatingHeapOccupancyPercent=35
> Launch script: kafka.tools.MirrorMaker --consumer.config
> consumer.properties --producer.config producer.properties --num.producers
> 1 --whitelist mirrortest --num.streams 1 --queue.size 100000
> consumer.properties
> ---------------------------
> zookeeper.connect=<host:port>
> group.id=KafkaMirror
> auto.offset.reset=smallest
> fetch.message.max.bytes=9000000
> zookeeper.connection.timeout.ms=60000
> rebalance.max.retries=4
> rebalance.backoff.ms=5000
> producer.properties
> --------------------------
> metadata.broker.list=<host:port>
> partitioner.class=<our custom round robin partitioner>
> producer.type=async
> When we start the mirroring job everything works fine as expected,
> Eventually we hit an issue where the job stops consuming no more.
> At this stage:
> 1. No Error seen in the mirrormaker logs
> 2. consumer threads are not fetching any messages and we see thread dumps
> as follows:
> "ConsumerFetcherThread-KafkaMirror_1454375262613-7b760d87-0-26" - Thread
> t@73
> java.lang.Thread.State: WAITING
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <79b6d3ce> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awa
> i
> t(AbstractQueuedSynchronizer.java:2039)
> at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350
> )
> at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcher
> T
> hread.scala:49)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfu
> n
> $apply$mcV$sp$2.apply(AbstractFetcherThread.scala:128)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfu
> n
> $apply$mcV$sp$2.apply(AbstractFetcherThread.scala:109)
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$m
> c
> V$sp(AbstractFetcherThread.scala:109)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(A
> b
> stractFetcherThread.scala:109)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(A
> b
> stractFetcherThread.scala:109)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThr
> e
> ad.scala:108)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> Locked ownable synchronizers:
> - locked <199dc92d> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> 3. Producer stops producing, in trace mode we notice it's handling 0
> events and Thread dump as follows:
> "ProducerSendThread--0" - Thread t@53
>   java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
> at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
> at sun.nio.ch.IOUtil.write(IOUtil.java:148)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)
> - locked <5ae2fc40> (a java.lang.Object)
> at java.nio.channels.SocketChannel.write(SocketChannel.java:502)
> at
> kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:5
> 6
> )
> at kafka.network.Send$class.writeCompletely(Transmission.scala:75)
> at
> kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend
> .
> scala:26)
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:103)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
> c
> er.scala:72)
> - locked <8489cd8> (a java.lang.Object)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply
> $
> mcV$sp(SyncProducer.scala:103)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply
> (
> SyncProducer.scala:103)
> at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply
> (
> SyncProducer.scala:103)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.sca
> l
> a:102)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
> at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven
> t
> Handler$$send(DefaultEventHandler.scala:255)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$
> 2
> .apply(DefaultEventHandler.scala:106)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$
> 2
> .apply(DefaultEventHandler.scala:100)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(Trav
> e
> rsableLike.scala:772)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9
> 8
> )
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9
> 8
> )
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226
> )
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala
> :
> 771)
> at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv
> e
> ntHandler.scala:100)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
> :
> 72)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
> a
> la:105)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
> o
> ducerSendThread.scala:88)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
> o
> ducerSendThread.scala:68)
> at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
> s
> cala:67)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to