[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-09-03 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13756807#comment-13756807
 ] 

Joel Koshy commented on KAFKA-937:
--

The delta patch slipped through the cracks. We hit that issue recently - a 
network glitch led to the leader-finder-thread hitting an exception while 
adding fetchers and the thread quit:

{code}
leader-finder-thread], Error due to 
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:507)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at 
kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:129)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
at 
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
at 
kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:144)
at 
kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
at 
kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:180)
at 
kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:80)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:95)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:92)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:92)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
{code}



+1 on kafka-937_delta with one minor comment: change the log to indicate that 
will attempt to look up the leader again and add fetchers - right now it just 
says "failed to add".

> ConsumerFetcherThread can deadlock
> --
>
> Key: KAFKA-937
> URL: https://issues.apache.org/jira/browse/KAFKA-937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 0.8
>
> Attachments: kafka-937_ConsumerOffsetChecker.patch, 
> kafka-937_delta.patch, kafka-937.patch
>
>
> We have the following access pattern that can introduce a deadlock.
> AbstractFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherThread.processPartitionsWithError() -> 
> ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
> LeaderFinderThread holding lock while calling 
> AbstractFetcherManager.shutdownIdleFetcherThreads() ->
> AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
> AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-06-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13693664#comment-13693664
 ] 

Jun Rao commented on KAFKA-937:
---

Alexey,

Thanks for the review. Committed the ConsumerOffsetChecker patch to 0.8.

> ConsumerFetcherThread can deadlock
> --
>
> Key: KAFKA-937
> URL: https://issues.apache.org/jira/browse/KAFKA-937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 0.8
>
> Attachments: kafka-937_ConsumerOffsetChecker.patch, 
> kafka-937_delta.patch, kafka-937.patch
>
>
> We have the following access pattern that can introduce a deadlock.
> AbstractFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherThread.processPartitionsWithError() -> 
> ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
> LeaderFinderThread holding lock while calling 
> AbstractFetcherManager.shutdownIdleFetcherThreads() ->
> AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
> AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-06-25 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13692816#comment-13692816
 ] 

Alexey Ozeritskiy commented on KAFKA-937:
-

This patch works, thanks.

> ConsumerFetcherThread can deadlock
> --
>
> Key: KAFKA-937
> URL: https://issues.apache.org/jira/browse/KAFKA-937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 0.8
>
> Attachments: kafka-937_ConsumerOffsetChecker.patch, 
> kafka-937_delta.patch, kafka-937.patch
>
>
> We have the following access pattern that can introduce a deadlock.
> AbstractFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherThread.processPartitionsWithError() -> 
> ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
> LeaderFinderThread holding lock while calling 
> AbstractFetcherManager.shutdownIdleFetcherThreads() ->
> AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
> AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-06-24 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13692418#comment-13692418
 ] 

Alexey Ozeritskiy commented on KAFKA-937:
-

https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blobdiff;f=core/src/main/scala/kafka/consumer/SimpleConsumer.scala;h=1c283280873eef597018f2f0a5ddfec942356c18;hp=bdeee9174a32a02209d769c18a0337ade0356e99;hb=5bd33c1517bb2e7734166dc3e787ac90a4ef8f86;hpb=640026467cf705fbcf6fd6bcada058b18a95bff5

> ConsumerFetcherThread can deadlock
> --
>
> Key: KAFKA-937
> URL: https://issues.apache.org/jira/browse/KAFKA-937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 0.8
>
> Attachments: kafka-937_delta.patch, kafka-937.patch
>
>
> We have the following access pattern that can introduce a deadlock.
> AbstractFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherThread.processPartitionsWithError() -> 
> ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
> LeaderFinderThread holding lock while calling 
> AbstractFetcherManager.shutdownIdleFetcherThreads() ->
> AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
> AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-06-24 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13692411#comment-13692411
 ] 

Alexey Ozeritskiy commented on KAFKA-937:
-

kafka.tools.ConsumerOffsetChecker uses SimpleConsumer for OffsetRequest

To reproduce just make git pull and run
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group 
--zkconnect zk-servers --topic topic

The problem is in the following diff:

diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index bdeee91..1c28328 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -37,6 +37,7 @@ class SimpleConsumer(val host: String,
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, 
BlockingChannel.UseDefaultBufferSize, soTimeout)
   val brokerInfo = "host_%s-port_%s".format(host, port)
   private val fetchRequestAndResponseStats = 
FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
+  private var isClosed = false

   private def connect(): BlockingChannel = {
 close
@@ -58,7 +59,8 @@ class SimpleConsumer(val host: String,

   def close() {
 lock synchronized {
-disconnect()
+  disconnect()
+  isClosed = true
 }
   }

@@ -123,7 +125,7 @@ class SimpleConsumer(val host: String,
   def getOffsetsBefore(request: OffsetRequest) = 
OffsetResponse.readFrom(sendRequest(request).buffer)

   private def getOrMakeConnection() {
-if(!blockingChannel.isConnected) {
+if(!isClosed && !blockingChannel.isConnected) {
   connect()
 }
   }


SimpleConsumer stops working after close (ConsumerOffsetChecker.scala, line 77)

> ConsumerFetcherThread can deadlock
> --
>
> Key: KAFKA-937
> URL: https://issues.apache.org/jira/browse/KAFKA-937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 0.8
>
> Attachments: kafka-937_delta.patch, kafka-937.patch
>
>
> We have the following access pattern that can introduce a deadlock.
> AbstractFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherThread.processPartitionsWithError() -> 
> ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
> LeaderFinderThread holding lock while calling 
> AbstractFetcherManager.shutdownIdleFetcherThreads() ->
> AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
> AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-06-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13692085#comment-13692085
 ] 

Jun Rao commented on KAFKA-937:
---

Alexey,

This issue seems to be unrelated to this patch. The exception is thrown in 
SimpleConsumer and this patch doesn't touch SimpleConsumer. Could you describe 
how you get to this issue and how reproducible it is?

> ConsumerFetcherThread can deadlock
> --
>
> Key: KAFKA-937
> URL: https://issues.apache.org/jira/browse/KAFKA-937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 0.8
>
> Attachments: kafka-937_delta.patch, kafka-937.patch
>
>
> We have the following access pattern that can introduce a deadlock.
> AbstractFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherThread.processPartitionsWithError() -> 
> ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
> LeaderFinderThread holding lock while calling 
> AbstractFetcherManager.shutdownIdleFetcherThreads() ->
> AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
> AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-06-24 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13691998#comment-13691998
 ] 

Alexey Ozeritskiy commented on KAFKA-937:
-

That patch breaks kafka.tools.ConsumerOffsetChecker:

[2013-06-24 18:11:17,638] INFO Reconnect due to socket error:  
(kafka.consumer.SimpleConsumer)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at 
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
at 
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
at 
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
at 
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
at 
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
at 
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
at 
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)

> ConsumerFetcherThread can deadlock
> --
>
> Key: KAFKA-937
> URL: https://issues.apache.org/jira/browse/KAFKA-937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Fix For: 0.8
>
> Attachments: kafka-937_delta.patch, kafka-937.patch
>
>
> We have the following access pattern that can introduce a deadlock.
> AbstractFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherThread.processPartitionsWithError() -> 
> ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
> LeaderFinderThread holding lock while calling 
> AbstractFetcherManager.shutdownIdleFetcherThreads() ->
> AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
> AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-06-12 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13681787#comment-13681787
 ] 

Joel Koshy commented on KAFKA-937:
--

+1 on the patch.

Additionally, can you make this small (unrelated change) -  make the console 
consumer's autoCommitIntervalOpt default to ConsumerConfig.AutoCommitInterval ?

I think it is worth documenting the typical path of getting into the above 
deadlock:
- Assume at least two fetchers F1, F2
- One or more partitions on F1 go into error and leader finder thread L is 
notified
- L unblocks and proceeds to handle partitions without leader. It holds the 
ConsumerFetcherManager's lock at this point.
- All partitions on F2 go into error.
- F2's handlePartitionsWithError removes partitions from its fetcher's 
partitionMap. (At this point, F2 is by definition an idle fetcher thread.)
- L tries to shutdown idle fetcher threads - i.e., tries to shutdown F2.
- However, F2 at this point is trying to addPartitionsWithError which needs to 
acquire the ConsumerFetcherManager's lock (which is currently held by L).

It is relatively rare in the sense that it can happen only if all partitions on 
the fetcher are in error. This could happen for example if all the leaders for 
those partitions move or become unavailable. Another instance where this may be 
seen in practice is mirroring: we ran into it when running the mirror maker 
with a very large number of producers and ran out of file handles. Running out 
of file handles could easily lead to exceptions on most/all fetches and result 
in an error state for all partitions.


> ConsumerFetcherThread can deadlock
> --
>
> Key: KAFKA-937
> URL: https://issues.apache.org/jira/browse/KAFKA-937
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Jun Rao
> Attachments: kafka-937.patch
>
>
> We have the following access pattern that can introduce a deadlock.
> AbstractFetcherThread.processPartitionsWithError() ->
> ConsumerFetcherThread.processPartitionsWithError() -> 
> ConsumerFetcherManager.addPartitionsWithError() wait for lock ->
> LeaderFinderThread holding lock while calling 
> AbstractFetcherManager.shutdownIdleFetcherThreads() ->
> AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
> AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira