[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock
[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13756807#comment-13756807 ] Joel Koshy commented on KAFKA-937: -- The delta patch slipped through the cracks. We hit that issue recently - a network glitch led to the leader-finder-thread hitting an exception while adding fetchers and the thread quit: {code} leader-finder-thread], Error due to java.net.ConnectException: Connection timed out at sun.nio.ch.Net.connect(Native Method) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:507) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:129) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:144) at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60) at kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:180) at kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:80) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:95) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:92) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:92) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) {code} +1 on kafka-937_delta with one minor comment: change the log to indicate that will attempt to look up the leader again and add fetchers - right now it just says "failed to add". > ConsumerFetcherThread can deadlock > -- > > Key: KAFKA-937 > URL: https://issues.apache.org/jira/browse/KAFKA-937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Fix For: 0.8 > > Attachments: kafka-937_ConsumerOffsetChecker.patch, > kafka-937_delta.patch, kafka-937.patch > > > We have the following access pattern that can introduce a deadlock. > AbstractFetcherThread.processPartitionsWithError() -> > ConsumerFetcherThread.processPartitionsWithError() -> > ConsumerFetcherManager.addPartitionsWithError() wait for lock -> > LeaderFinderThread holding lock while calling > AbstractFetcherManager.shutdownIdleFetcherThreads() -> > AbstractFetcherManager calling fetcher.shutdown, which needs to wait until > AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock
[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13693664#comment-13693664 ] Jun Rao commented on KAFKA-937: --- Alexey, Thanks for the review. Committed the ConsumerOffsetChecker patch to 0.8. > ConsumerFetcherThread can deadlock > -- > > Key: KAFKA-937 > URL: https://issues.apache.org/jira/browse/KAFKA-937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Fix For: 0.8 > > Attachments: kafka-937_ConsumerOffsetChecker.patch, > kafka-937_delta.patch, kafka-937.patch > > > We have the following access pattern that can introduce a deadlock. > AbstractFetcherThread.processPartitionsWithError() -> > ConsumerFetcherThread.processPartitionsWithError() -> > ConsumerFetcherManager.addPartitionsWithError() wait for lock -> > LeaderFinderThread holding lock while calling > AbstractFetcherManager.shutdownIdleFetcherThreads() -> > AbstractFetcherManager calling fetcher.shutdown, which needs to wait until > AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock
[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13692816#comment-13692816 ] Alexey Ozeritskiy commented on KAFKA-937: - This patch works, thanks. > ConsumerFetcherThread can deadlock > -- > > Key: KAFKA-937 > URL: https://issues.apache.org/jira/browse/KAFKA-937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Fix For: 0.8 > > Attachments: kafka-937_ConsumerOffsetChecker.patch, > kafka-937_delta.patch, kafka-937.patch > > > We have the following access pattern that can introduce a deadlock. > AbstractFetcherThread.processPartitionsWithError() -> > ConsumerFetcherThread.processPartitionsWithError() -> > ConsumerFetcherManager.addPartitionsWithError() wait for lock -> > LeaderFinderThread holding lock while calling > AbstractFetcherManager.shutdownIdleFetcherThreads() -> > AbstractFetcherManager calling fetcher.shutdown, which needs to wait until > AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock
[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13692418#comment-13692418 ] Alexey Ozeritskiy commented on KAFKA-937: - https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blobdiff;f=core/src/main/scala/kafka/consumer/SimpleConsumer.scala;h=1c283280873eef597018f2f0a5ddfec942356c18;hp=bdeee9174a32a02209d769c18a0337ade0356e99;hb=5bd33c1517bb2e7734166dc3e787ac90a4ef8f86;hpb=640026467cf705fbcf6fd6bcada058b18a95bff5 > ConsumerFetcherThread can deadlock > -- > > Key: KAFKA-937 > URL: https://issues.apache.org/jira/browse/KAFKA-937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Fix For: 0.8 > > Attachments: kafka-937_delta.patch, kafka-937.patch > > > We have the following access pattern that can introduce a deadlock. > AbstractFetcherThread.processPartitionsWithError() -> > ConsumerFetcherThread.processPartitionsWithError() -> > ConsumerFetcherManager.addPartitionsWithError() wait for lock -> > LeaderFinderThread holding lock while calling > AbstractFetcherManager.shutdownIdleFetcherThreads() -> > AbstractFetcherManager calling fetcher.shutdown, which needs to wait until > AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock
[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13692411#comment-13692411 ] Alexey Ozeritskiy commented on KAFKA-937: - kafka.tools.ConsumerOffsetChecker uses SimpleConsumer for OffsetRequest To reproduce just make git pull and run bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group --zkconnect zk-servers --topic topic The problem is in the following diff: diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index bdeee91..1c28328 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -37,6 +37,7 @@ class SimpleConsumer(val host: String, private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout) val brokerInfo = "host_%s-port_%s".format(host, port) private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId) + private var isClosed = false private def connect(): BlockingChannel = { close @@ -58,7 +59,8 @@ class SimpleConsumer(val host: String, def close() { lock synchronized { -disconnect() + disconnect() + isClosed = true } } @@ -123,7 +125,7 @@ class SimpleConsumer(val host: String, def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer) private def getOrMakeConnection() { -if(!blockingChannel.isConnected) { +if(!isClosed && !blockingChannel.isConnected) { connect() } } SimpleConsumer stops working after close (ConsumerOffsetChecker.scala, line 77) > ConsumerFetcherThread can deadlock > -- > > Key: KAFKA-937 > URL: https://issues.apache.org/jira/browse/KAFKA-937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Fix For: 0.8 > > Attachments: kafka-937_delta.patch, kafka-937.patch > > > We have the following access pattern that can introduce a deadlock. > AbstractFetcherThread.processPartitionsWithError() -> > ConsumerFetcherThread.processPartitionsWithError() -> > ConsumerFetcherManager.addPartitionsWithError() wait for lock -> > LeaderFinderThread holding lock while calling > AbstractFetcherManager.shutdownIdleFetcherThreads() -> > AbstractFetcherManager calling fetcher.shutdown, which needs to wait until > AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock
[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13692085#comment-13692085 ] Jun Rao commented on KAFKA-937: --- Alexey, This issue seems to be unrelated to this patch. The exception is thrown in SimpleConsumer and this patch doesn't touch SimpleConsumer. Could you describe how you get to this issue and how reproducible it is? > ConsumerFetcherThread can deadlock > -- > > Key: KAFKA-937 > URL: https://issues.apache.org/jira/browse/KAFKA-937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Fix For: 0.8 > > Attachments: kafka-937_delta.patch, kafka-937.patch > > > We have the following access pattern that can introduce a deadlock. > AbstractFetcherThread.processPartitionsWithError() -> > ConsumerFetcherThread.processPartitionsWithError() -> > ConsumerFetcherManager.addPartitionsWithError() wait for lock -> > LeaderFinderThread holding lock while calling > AbstractFetcherManager.shutdownIdleFetcherThreads() -> > AbstractFetcherManager calling fetcher.shutdown, which needs to wait until > AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock
[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13691998#comment-13691998 ] Alexey Ozeritskiy commented on KAFKA-937: - That patch breaks kafka.tools.ConsumerOffsetChecker: [2013-06-24 18:11:17,638] INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153) at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala) > ConsumerFetcherThread can deadlock > -- > > Key: KAFKA-937 > URL: https://issues.apache.org/jira/browse/KAFKA-937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Fix For: 0.8 > > Attachments: kafka-937_delta.patch, kafka-937.patch > > > We have the following access pattern that can introduce a deadlock. > AbstractFetcherThread.processPartitionsWithError() -> > ConsumerFetcherThread.processPartitionsWithError() -> > ConsumerFetcherManager.addPartitionsWithError() wait for lock -> > LeaderFinderThread holding lock while calling > AbstractFetcherManager.shutdownIdleFetcherThreads() -> > AbstractFetcherManager calling fetcher.shutdown, which needs to wait until > AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock
[ https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13681787#comment-13681787 ] Joel Koshy commented on KAFKA-937: -- +1 on the patch. Additionally, can you make this small (unrelated change) - make the console consumer's autoCommitIntervalOpt default to ConsumerConfig.AutoCommitInterval ? I think it is worth documenting the typical path of getting into the above deadlock: - Assume at least two fetchers F1, F2 - One or more partitions on F1 go into error and leader finder thread L is notified - L unblocks and proceeds to handle partitions without leader. It holds the ConsumerFetcherManager's lock at this point. - All partitions on F2 go into error. - F2's handlePartitionsWithError removes partitions from its fetcher's partitionMap. (At this point, F2 is by definition an idle fetcher thread.) - L tries to shutdown idle fetcher threads - i.e., tries to shutdown F2. - However, F2 at this point is trying to addPartitionsWithError which needs to acquire the ConsumerFetcherManager's lock (which is currently held by L). It is relatively rare in the sense that it can happen only if all partitions on the fetcher are in error. This could happen for example if all the leaders for those partitions move or become unavailable. Another instance where this may be seen in practice is mirroring: we ran into it when running the mirror maker with a very large number of producers and ran out of file handles. Running out of file handles could easily lead to exceptions on most/all fetches and result in an error state for all partitions. > ConsumerFetcherThread can deadlock > -- > > Key: KAFKA-937 > URL: https://issues.apache.org/jira/browse/KAFKA-937 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Jun Rao > Attachments: kafka-937.patch > > > We have the following access pattern that can introduce a deadlock. > AbstractFetcherThread.processPartitionsWithError() -> > ConsumerFetcherThread.processPartitionsWithError() -> > ConsumerFetcherManager.addPartitionsWithError() wait for lock -> > LeaderFinderThread holding lock while calling > AbstractFetcherManager.shutdownIdleFetcherThreads() -> > AbstractFetcherManager calling fetcher.shutdown, which needs to wait until > AbstractFetcherThread.processPartitionsWithError() completes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira