[jira] [Assigned] (KAFKA-633) AdminTest.testShutdownBroker fails

2012-11-29 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy reassigned KAFKA-633:


Assignee: Joel Koshy

 AdminTest.testShutdownBroker fails
 --

 Key: KAFKA-633
 URL: https://issues.apache.org/jira/browse/KAFKA-633
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Joel Koshy

 0m[ [31merror [0m]  [0mTest Failed: testShutdownBroker(kafka.admin.AdminTest) 
 [0m
 junit.framework.AssertionFailedError: expected:2 but was:3
   at junit.framework.Assert.fail(Assert.java:47)
   at junit.framework.Assert.failNotEquals(Assert.java:277)
   at junit.framework.Assert.assertEquals(Assert.java:64)
   at junit.framework.Assert.assertEquals(Assert.java:195)
   at junit.framework.Assert.assertEquals(Assert.java:201)
   at kafka.admin.AdminTest.testShutdownBroker(AdminTest.scala:381)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   at java.lang.reflect.Method.invoke(Method.java:597)
   at junit.framework.TestCase.runTest(TestCase.java:164)
   at junit.framework.TestCase.runBare(TestCase.java:130)
   at junit.framework.TestResult$1.protect(TestResult.java:110)
   at junit.framework.TestResult.runProtected(TestResult.java:128)
   at junit.framework.TestResult.run(TestResult.java:113)
   at junit.framework.TestCase.run(TestCase.java:120)
   at junit.framework.TestSuite.runTest(TestSuite.java:228)
   at junit.framework.TestSuite.run(TestSuite.java:223)
   at junit.framework.TestSuite.runTest(TestSuite.java:228)
   at junit.framework.TestSuite.run(TestSuite.java:223)
   at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
   at 
 org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
   at sbt.TestRunner.run(TestFramework.scala:53)
   at sbt.TestRunner.runTest$1(TestFramework.scala:67)
   at sbt.TestRunner.run(TestFramework.scala:76)
   at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
   at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
   at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
   at sbt.NamedTestTask.run(TestFramework.scala:92)
   at 
 sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
   at 
 sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
   at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
   at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
   at sbt.impl.RunTask.runTask(RunTask.scala:85)
   at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
   at 
 sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
   at 
 sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
   at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
   at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
   at sbt.Control$.trapUnit(Control.scala:19)
   at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-598) decouple fetch size from max message size

2012-11-30 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-598:
-

Attachment: KAFKA-598-v1.patch

I spent the better part of a day rebasing - serves me right for letting this
patch sit for so long.

The basic idea is to keep track of incomplete partitions in a multi-fetch
request and reissue a fetch request with a higher fetch size for those
incomplete partitions.

I had considered the possibility of pipelining fetch requests for
incomplete partitions - i.e., without using an upper fetch size. That
would entail issuing fetch requests at increasing offsets (with the same
fetch size) until the message is complete - during which the (partial)
message would be buffered. With this approach we would probably add an
additional maxFetchMem config.  However, with logical offsets we don't
have byte-addressability anymore - so it is not possible right now.
Furthermore, with a maxFetchMem param it becomes somewhat similar to the
upperFetchSize approach (in the sense that the client has to be prepared to
handle a certain amount of memory) - so we don't really gain much. The ideal
case would be to support streaming over a single fetch request but this is
obviously a much more complex feature to implement.

Also fixed a bug in the use of partitionMapLock - i.e., one line synchronized
on the reentrant lock instead of locking it.

BTW, for the ReplicaFetchTest change to make sense I could have it expect to 
fail with a smaller upper fetch size, and then repeat with a higher upper
fetch size, but that would add to the test duration - and it's not mocked
out.


 decouple fetch size from max message size
 -

 Key: KAFKA-598
 URL: https://issues.apache.org/jira/browse/KAFKA-598
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Joel Koshy
 Attachments: KAFKA-598-v1.patch


 Currently, a consumer has to set fetch size larger than the max message size. 
 This increases the memory footprint on the consumer, especially when a large 
 number of topic/partition is subscribed. By decoupling the fetch size from 
 max message size, we can use a smaller fetch size for normal consumption and 
 when hitting a large message (hopefully rare), we automatically increase 
 fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-633) AdminTest.testShutdownBroker fails

2012-12-06 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-633:
-

Attachment: KAFKA-633-v1.patch

This is a timing issue that affects low-volume topics (and in this case an 
empty topic). The issue is that the leader that is being shut down receives a 
leaderAndIsrRequest informing it is no longer the leader and thus starts up a 
follower which starts issuing fetch requests to the new leader. We then shrink 
the ISR and send a StopReplicaRequest to the shutting down broker. However, the 
new leader upon receiving the fetch request expands the ISR again.

The shutdown itself is working correctly in that the leader has been 
successfully moved. This patch fixes the assertion by checking the controller's 
cached ISR instead of ZooKeeper. The patch also fixes the annoying 
zookeeper-related messages if the test fails - the problem was that the brokers 
were not getting shut down and were trying to talk to the torn down zookeeper.

I think it would be better to fix the corner case in a separate non-blocker 
jira. One possible approach would be to use the callback feature in the 
ControllerBrokerRequestBatch and wait until the StopReplicaRequest has been 
processed by the shutting down broker before shrinking the ISR; and there are 
probably other ways as well.


 AdminTest.testShutdownBroker fails
 --

 Key: KAFKA-633
 URL: https://issues.apache.org/jira/browse/KAFKA-633
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Joel Koshy
Priority: Blocker
 Attachments: KAFKA-633-v1.patch


 0m[ [31merror [0m]  [0mTest Failed: testShutdownBroker(kafka.admin.AdminTest) 
 [0m
 junit.framework.AssertionFailedError: expected:2 but was:3
   at junit.framework.Assert.fail(Assert.java:47)
   at junit.framework.Assert.failNotEquals(Assert.java:277)
   at junit.framework.Assert.assertEquals(Assert.java:64)
   at junit.framework.Assert.assertEquals(Assert.java:195)
   at junit.framework.Assert.assertEquals(Assert.java:201)
   at kafka.admin.AdminTest.testShutdownBroker(AdminTest.scala:381)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   at java.lang.reflect.Method.invoke(Method.java:597)
   at junit.framework.TestCase.runTest(TestCase.java:164)
   at junit.framework.TestCase.runBare(TestCase.java:130)
   at junit.framework.TestResult$1.protect(TestResult.java:110)
   at junit.framework.TestResult.runProtected(TestResult.java:128)
   at junit.framework.TestResult.run(TestResult.java:113)
   at junit.framework.TestCase.run(TestCase.java:120)
   at junit.framework.TestSuite.runTest(TestSuite.java:228)
   at junit.framework.TestSuite.run(TestSuite.java:223)
   at junit.framework.TestSuite.runTest(TestSuite.java:228)
   at junit.framework.TestSuite.run(TestSuite.java:223)
   at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
   at 
 org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
   at sbt.TestRunner.run(TestFramework.scala:53)
   at sbt.TestRunner.runTest$1(TestFramework.scala:67)
   at sbt.TestRunner.run(TestFramework.scala:76)
   at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
   at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
   at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
   at sbt.NamedTestTask.run(TestFramework.scala:92)
   at 
 sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
   at 
 sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
   at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
   at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
   at sbt.impl.RunTask.runTask(RunTask.scala:85)
   at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
   at 
 sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
   at 
 sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
   at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
   at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
   at sbt.Control$.trapUnit(Control.scala:19)
   at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your 

[jira] [Closed] (KAFKA-633) AdminTest.testShutdownBroker fails

2012-12-06 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-633.



Checked into 0.8. Will file separate jira to protect the ISR from re-expansion.

 AdminTest.testShutdownBroker fails
 --

 Key: KAFKA-633
 URL: https://issues.apache.org/jira/browse/KAFKA-633
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Joel Koshy
Priority: Blocker
 Attachments: KAFKA-633-v1.patch


 0m[ [31merror [0m]  [0mTest Failed: testShutdownBroker(kafka.admin.AdminTest) 
 [0m
 junit.framework.AssertionFailedError: expected:2 but was:3
   at junit.framework.Assert.fail(Assert.java:47)
   at junit.framework.Assert.failNotEquals(Assert.java:277)
   at junit.framework.Assert.assertEquals(Assert.java:64)
   at junit.framework.Assert.assertEquals(Assert.java:195)
   at junit.framework.Assert.assertEquals(Assert.java:201)
   at kafka.admin.AdminTest.testShutdownBroker(AdminTest.scala:381)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   at java.lang.reflect.Method.invoke(Method.java:597)
   at junit.framework.TestCase.runTest(TestCase.java:164)
   at junit.framework.TestCase.runBare(TestCase.java:130)
   at junit.framework.TestResult$1.protect(TestResult.java:110)
   at junit.framework.TestResult.runProtected(TestResult.java:128)
   at junit.framework.TestResult.run(TestResult.java:113)
   at junit.framework.TestCase.run(TestCase.java:120)
   at junit.framework.TestSuite.runTest(TestSuite.java:228)
   at junit.framework.TestSuite.run(TestSuite.java:223)
   at junit.framework.TestSuite.runTest(TestSuite.java:228)
   at junit.framework.TestSuite.run(TestSuite.java:223)
   at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
   at 
 org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
   at sbt.TestRunner.run(TestFramework.scala:53)
   at sbt.TestRunner.runTest$1(TestFramework.scala:67)
   at sbt.TestRunner.run(TestFramework.scala:76)
   at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
   at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
   at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
   at sbt.NamedTestTask.run(TestFramework.scala:92)
   at 
 sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
   at 
 sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
   at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
   at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
   at sbt.impl.RunTask.runTask(RunTask.scala:85)
   at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
   at 
 sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
   at 
 sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
   at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
   at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
   at sbt.Control$.trapUnit(Control.scala:19)
   at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-633) AdminTest.testShutdownBroker fails

2012-12-06 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-633.
--

Resolution: Fixed

 AdminTest.testShutdownBroker fails
 --

 Key: KAFKA-633
 URL: https://issues.apache.org/jira/browse/KAFKA-633
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Joel Koshy
Priority: Blocker
 Attachments: KAFKA-633-v1.patch


 0m[ [31merror [0m]  [0mTest Failed: testShutdownBroker(kafka.admin.AdminTest) 
 [0m
 junit.framework.AssertionFailedError: expected:2 but was:3
   at junit.framework.Assert.fail(Assert.java:47)
   at junit.framework.Assert.failNotEquals(Assert.java:277)
   at junit.framework.Assert.assertEquals(Assert.java:64)
   at junit.framework.Assert.assertEquals(Assert.java:195)
   at junit.framework.Assert.assertEquals(Assert.java:201)
   at kafka.admin.AdminTest.testShutdownBroker(AdminTest.scala:381)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   at java.lang.reflect.Method.invoke(Method.java:597)
   at junit.framework.TestCase.runTest(TestCase.java:164)
   at junit.framework.TestCase.runBare(TestCase.java:130)
   at junit.framework.TestResult$1.protect(TestResult.java:110)
   at junit.framework.TestResult.runProtected(TestResult.java:128)
   at junit.framework.TestResult.run(TestResult.java:113)
   at junit.framework.TestCase.run(TestCase.java:120)
   at junit.framework.TestSuite.runTest(TestSuite.java:228)
   at junit.framework.TestSuite.run(TestSuite.java:223)
   at junit.framework.TestSuite.runTest(TestSuite.java:228)
   at junit.framework.TestSuite.run(TestSuite.java:223)
   at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
   at 
 org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
   at sbt.TestRunner.run(TestFramework.scala:53)
   at sbt.TestRunner.runTest$1(TestFramework.scala:67)
   at sbt.TestRunner.run(TestFramework.scala:76)
   at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
   at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
   at 
 sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
   at sbt.NamedTestTask.run(TestFramework.scala:92)
   at 
 sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
   at 
 sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
   at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
   at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
   at sbt.impl.RunTask.runTask(RunTask.scala:85)
   at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
   at 
 sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
   at 
 sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
   at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
   at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
   at sbt.Control$.trapUnit(Control.scala:19)
   at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-664) Kafka server threads die due to OOME during long running test

2012-12-07 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13526876#comment-13526876
 ] 

Joel Koshy commented on KAFKA-664:
--

To clarify, the map itself shouldn't grow indefinitely right? - i.e., if there 
are no new partitions the number of keys should be the same. I think the issue 
is that expired requests (for a key) are not removed from the list of 
outstanding requests for that key.

 Kafka server threads die due to OOME during long running test
 -

 Key: KAFKA-664
 URL: https://issues.apache.org/jira/browse/KAFKA-664
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Jay Kreps
Priority: Blocker
  Labels: bugs
 Fix For: 0.8

 Attachments: thread-dump.log


 I set up a Kafka cluster with 5 brokers (JVM memory 512M) and set up a long 
 running producer process that sends data to 100s of partitions continuously 
 for ~15 hours. After ~4 hours of operation, few server threads (acceptor and 
 processor) exited due to OOME -
 [2012-12-07 08:24:44,355] ERROR OOME with size 1700161893 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-acceptor': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-processor-9092-1': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:46,344] INFO Unable to reconnect to ZooKeeper service, 
 session 0x13afd0753870103 has expired, closing socket connection 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:24:46,344] INFO zookeeper state changed (Expired) 
 (org.I0Itec.zkclient.ZkClient)
 [2012-12-07 08:24:46,344] INFO Initiating client connection, 
 connectString=eat1-app309.corp:12913,eat1-app310.corp:12913,eat1-app311.corp:12913,eat1-app312.corp:12913,eat1-app313.corp:12913
  sessionTimeout=15000 watcher=org.I0Itec.zkclient.ZkClient@19202d69 
 (org.apache.zookeeper.ZooKeeper)
 [2012-12-07 08:24:55,702] ERROR OOME with size 2001040997 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:01,192] ERROR Uncaught exception in thread 
 'kafka-request-handler-0': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:08,739] INFO Opening socket connection to server 
 eat1-app311.corp/172.20.72.75:12913 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:14,221] INFO Socket connection established to 
 eat1-app311.corp/172.20.72.75:12913, initiating session 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:17,943] INFO Client session timed out, have not heard from 
 server in 3722ms for sessionid 0x0, closing socket connection and attempting 
 reconnect (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:19,805] ERROR error in loggedRunnable (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:23,528] ERROR OOME with size 1853095936 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 It seems like it runs out of memory while trying to read the producer 
 request, but its unclear so far. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-664) Kafka server threads die due to OOME during long running test

2012-12-07 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13526883#comment-13526883
 ] 

Joel Koshy commented on KAFKA-664:
--

Okay I'm slightly confused. Even on expiration the request is marked as 
satisfied. So even if it is not removed from the watcher's list during 
expiration it will be removed on the next call to collectSatisfiedRequests - 
which in this case will be when the next produce request arrives to that 
partition. Which means this should only be due to low-volume partitions that 
are no longer growing. i.e., the replica fetcher would keep issuing fetch 
requests that keep expiring but never get removed from the list of pending 
requests in watchersFor(the-low-volume-partition).

 Kafka server threads die due to OOME during long running test
 -

 Key: KAFKA-664
 URL: https://issues.apache.org/jira/browse/KAFKA-664
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Jay Kreps
Priority: Blocker
  Labels: bugs
 Fix For: 0.8

 Attachments: thread-dump.log


 I set up a Kafka cluster with 5 brokers (JVM memory 512M) and set up a long 
 running producer process that sends data to 100s of partitions continuously 
 for ~15 hours. After ~4 hours of operation, few server threads (acceptor and 
 processor) exited due to OOME -
 [2012-12-07 08:24:44,355] ERROR OOME with size 1700161893 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-acceptor': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-processor-9092-1': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:46,344] INFO Unable to reconnect to ZooKeeper service, 
 session 0x13afd0753870103 has expired, closing socket connection 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:24:46,344] INFO zookeeper state changed (Expired) 
 (org.I0Itec.zkclient.ZkClient)
 [2012-12-07 08:24:46,344] INFO Initiating client connection, 
 connectString=eat1-app309.corp:12913,eat1-app310.corp:12913,eat1-app311.corp:12913,eat1-app312.corp:12913,eat1-app313.corp:12913
  sessionTimeout=15000 watcher=org.I0Itec.zkclient.ZkClient@19202d69 
 (org.apache.zookeeper.ZooKeeper)
 [2012-12-07 08:24:55,702] ERROR OOME with size 2001040997 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:01,192] ERROR Uncaught exception in thread 
 'kafka-request-handler-0': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:08,739] INFO Opening socket connection to server 
 eat1-app311.corp/172.20.72.75:12913 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:14,221] INFO Socket connection established to 
 eat1-app311.corp/172.20.72.75:12913, initiating session 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:17,943] INFO Client session timed out, have not heard from 
 server in 3722ms for sessionid 0x0, closing socket connection and attempting 
 reconnect (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:19,805] ERROR error in loggedRunnable (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:23,528] ERROR OOME with size 1853095936 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 It seems like it runs out of memory while trying to read the producer 
 request, but its unclear so far. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-664) Kafka server threads die due to OOME during long running test

2012-12-11 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13529313#comment-13529313
 ] 

Joel Koshy commented on KAFKA-664:
--

Agreed that we should checkin this fix + the throttling (forgot to add that in 
v2) and open a separate jira to refactor the purgatory a bit.

 Kafka server threads die due to OOME during long running test
 -

 Key: KAFKA-664
 URL: https://issues.apache.org/jira/browse/KAFKA-664
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Jay Kreps
Priority: Blocker
  Labels: bugs
 Fix For: 0.8

 Attachments: kafka-664-draft-2.patch, kafka-664-draft.patch, Screen 
 Shot 2012-12-09 at 11.22.50 AM.png, Screen Shot 2012-12-09 at 11.23.09 
 AM.png, Screen Shot 2012-12-09 at 11.31.29 AM.png, thread-dump.log, 
 watchersForKey.png


 I set up a Kafka cluster with 5 brokers (JVM memory 512M) and set up a long 
 running producer process that sends data to 100s of partitions continuously 
 for ~15 hours. After ~4 hours of operation, few server threads (acceptor and 
 processor) exited due to OOME -
 [2012-12-07 08:24:44,355] ERROR OOME with size 1700161893 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-acceptor': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-processor-9092-1': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:46,344] INFO Unable to reconnect to ZooKeeper service, 
 session 0x13afd0753870103 has expired, closing socket connection 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:24:46,344] INFO zookeeper state changed (Expired) 
 (org.I0Itec.zkclient.ZkClient)
 [2012-12-07 08:24:46,344] INFO Initiating client connection, 
 connectString=eat1-app309.corp:12913,eat1-app310.corp:12913,eat1-app311.corp:12913,eat1-app312.corp:12913,eat1-app313.corp:12913
  sessionTimeout=15000 watcher=org.I0Itec.zkclient.ZkClient@19202d69 
 (org.apache.zookeeper.ZooKeeper)
 [2012-12-07 08:24:55,702] ERROR OOME with size 2001040997 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:01,192] ERROR Uncaught exception in thread 
 'kafka-request-handler-0': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:08,739] INFO Opening socket connection to server 
 eat1-app311.corp/172.20.72.75:12913 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:14,221] INFO Socket connection established to 
 eat1-app311.corp/172.20.72.75:12913, initiating session 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:17,943] INFO Client session timed out, have not heard from 
 server in 3722ms for sessionid 0x0, closing socket connection and attempting 
 reconnect (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:19,805] ERROR error in loggedRunnable (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:23,528] ERROR OOME with size 1853095936 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 It seems like it runs out of memory while trying to read the producer 
 request, but its unclear so far. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-664) Kafka server threads die due to OOME during long running test

2012-12-11 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13529489#comment-13529489
 ] 

Joel Koshy commented on KAFKA-664:
--

One problem with the updated patch is the following: multi-fetch requests could 
be satisfied (since minBytes is 1).
i.e., the request will be marked as satisfied and the expiration code path will 
not be executed. We should do the update
on expiration even if the request has been satisfied.

So an additional catch-all that may make life easier for us is to have a 
global threshold of the requestsFor map - i.e.,
if its size exceeds a threshold (which will be checked on both 
expiration/checkSatisfied) then trigger a full cleanup - i.e.,
iterate over all entries in watchersFor and remove those that are satisfied.

 Kafka server threads die due to OOME during long running test
 -

 Key: KAFKA-664
 URL: https://issues.apache.org/jira/browse/KAFKA-664
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Jay Kreps
Priority: Blocker
  Labels: bugs
 Fix For: 0.8

 Attachments: kafka-664-draft-2.patch, kafka-664-draft.patch, Screen 
 Shot 2012-12-09 at 11.22.50 AM.png, Screen Shot 2012-12-09 at 11.23.09 
 AM.png, Screen Shot 2012-12-09 at 11.31.29 AM.png, thread-dump.log, 
 watchersForKey.png


 I set up a Kafka cluster with 5 brokers (JVM memory 512M) and set up a long 
 running producer process that sends data to 100s of partitions continuously 
 for ~15 hours. After ~4 hours of operation, few server threads (acceptor and 
 processor) exited due to OOME -
 [2012-12-07 08:24:44,355] ERROR OOME with size 1700161893 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-acceptor': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-processor-9092-1': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:46,344] INFO Unable to reconnect to ZooKeeper service, 
 session 0x13afd0753870103 has expired, closing socket connection 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:24:46,344] INFO zookeeper state changed (Expired) 
 (org.I0Itec.zkclient.ZkClient)
 [2012-12-07 08:24:46,344] INFO Initiating client connection, 
 connectString=eat1-app309.corp:12913,eat1-app310.corp:12913,eat1-app311.corp:12913,eat1-app312.corp:12913,eat1-app313.corp:12913
  sessionTimeout=15000 watcher=org.I0Itec.zkclient.ZkClient@19202d69 
 (org.apache.zookeeper.ZooKeeper)
 [2012-12-07 08:24:55,702] ERROR OOME with size 2001040997 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:01,192] ERROR Uncaught exception in thread 
 'kafka-request-handler-0': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:08,739] INFO Opening socket connection to server 
 eat1-app311.corp/172.20.72.75:12913 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:14,221] INFO Socket connection established to 
 eat1-app311.corp/172.20.72.75:12913, initiating session 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:17,943] INFO Client session timed out, have not heard from 
 server in 3722ms for sessionid 0x0, closing socket connection and attempting 
 reconnect (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:19,805] ERROR error in loggedRunnable (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:23,528] ERROR OOME with size 1853095936 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 It seems like it runs out of memory while trying to read the producer 
 request, but its unclear so far. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-604) Add missing metrics in 0.8

2012-12-12 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13530165#comment-13530165
 ] 

Joel Koshy commented on KAFKA-604:
--

The rebased patch does not really comment on/address the concern in the 
original review - i.e., it breaks a relatively clean
and convenient syntax in the common case (of updating a single timer) to 
support updates to multiple timers. It's obviously
not a particularly major issue, but a rung higher than say, a debate over 
whitespace/coding style. My only point is that we
should try and avoid making a change that goes from a nice syntax to a rather 
inconvenient syntax. For that reason, I'm
more in favor of the change to KafkaTimer in KAFKA-646.

That said, how about the following: both these patches need multiple timer 
updates to update an aggregate timer as well
as a specific timer - i.e., up to this point we don't really have a use case 
for updating more than two timers simultaneously.
So we can accomplish this case with the following:

aggregatekafkatimer.time {
  specifickafkatimer.time {
code block
  }
}

and avoid any change to KafkaTimer. Does that seem reasonable to you guys?


 Add missing metrics in 0.8
 --

 Key: KAFKA-604
 URL: https://issues.apache.org/jira/browse/KAFKA-604
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
 Attachments: kafka_604_v1.patch, kafka_604_v2.patch, kafka_604_v3.diff

   Original Estimate: 24h
  Remaining Estimate: 24h

 It would be good if we add the following metrics:
 Producer: droppedMessageRate per topic
 ReplicaManager: partition count on the broker
 FileMessageSet: logFlushTimer per log (i.e., partition). Also, logFlushTime 
 should probably be moved to LogSegment since the flush now includes index 
 flush time.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-664) Kafka server threads die due to OOME during long running test

2012-12-12 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-664:
-

Attachment: KAFKA-664-v3.patch

Here is a simpler and hopefully safer approach of using a global request 
counter and purging both the delay queue and watcher map. Also, I changed the 
existing gauge to just count the number of requests in purgatory (i.e., over 
both the delay queue and the map).


 Kafka server threads die due to OOME during long running test
 -

 Key: KAFKA-664
 URL: https://issues.apache.org/jira/browse/KAFKA-664
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Jay Kreps
Priority: Blocker
  Labels: bugs
 Fix For: 0.8

 Attachments: kafka-664-draft-2.patch, kafka-664-draft.patch, 
 KAFKA-664-v3.patch, Screen Shot 2012-12-09 at 11.22.50 AM.png, Screen Shot 
 2012-12-09 at 11.23.09 AM.png, Screen Shot 2012-12-09 at 11.31.29 AM.png, 
 thread-dump.log, watchersForKey.png


 I set up a Kafka cluster with 5 brokers (JVM memory 512M) and set up a long 
 running producer process that sends data to 100s of partitions continuously 
 for ~15 hours. After ~4 hours of operation, few server threads (acceptor and 
 processor) exited due to OOME -
 [2012-12-07 08:24:44,355] ERROR OOME with size 1700161893 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-acceptor': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-processor-9092-1': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:46,344] INFO Unable to reconnect to ZooKeeper service, 
 session 0x13afd0753870103 has expired, closing socket connection 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:24:46,344] INFO zookeeper state changed (Expired) 
 (org.I0Itec.zkclient.ZkClient)
 [2012-12-07 08:24:46,344] INFO Initiating client connection, 
 connectString=eat1-app309.corp:12913,eat1-app310.corp:12913,eat1-app311.corp:12913,eat1-app312.corp:12913,eat1-app313.corp:12913
  sessionTimeout=15000 watcher=org.I0Itec.zkclient.ZkClient@19202d69 
 (org.apache.zookeeper.ZooKeeper)
 [2012-12-07 08:24:55,702] ERROR OOME with size 2001040997 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:01,192] ERROR Uncaught exception in thread 
 'kafka-request-handler-0': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:08,739] INFO Opening socket connection to server 
 eat1-app311.corp/172.20.72.75:12913 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:14,221] INFO Socket connection established to 
 eat1-app311.corp/172.20.72.75:12913, initiating session 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:17,943] INFO Client session timed out, have not heard from 
 server in 3722ms for sessionid 0x0, closing socket connection and attempting 
 reconnect (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:19,805] ERROR error in loggedRunnable (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:23,528] ERROR OOME with size 1853095936 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 It seems like it runs out of memory while trying to read the producer 
 request, but its unclear so far. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-671) DelayedProduce requests should not hold full producer request data

2012-12-12 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-671:


 Summary: DelayedProduce requests should not hold full producer 
request data
 Key: KAFKA-671
 URL: https://issues.apache.org/jira/browse/KAFKA-671
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8.1


Per summary, this leads to unnecessary memory usage.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-664) Kafka server threads die due to OOME during long running test

2012-12-12 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13530254#comment-13530254
 ] 

Joel Koshy commented on KAFKA-664:
--

Also, just filed KAFKA-671 to address the issue that Jay pointed out.

 Kafka server threads die due to OOME during long running test
 -

 Key: KAFKA-664
 URL: https://issues.apache.org/jira/browse/KAFKA-664
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Jay Kreps
Priority: Blocker
  Labels: bugs
 Fix For: 0.8

 Attachments: kafka-664-draft-2.patch, kafka-664-draft.patch, 
 KAFKA-664-v3.patch, Screen Shot 2012-12-09 at 11.22.50 AM.png, Screen Shot 
 2012-12-09 at 11.23.09 AM.png, Screen Shot 2012-12-09 at 11.31.29 AM.png, 
 thread-dump.log, watchersForKey.png


 I set up a Kafka cluster with 5 brokers (JVM memory 512M) and set up a long 
 running producer process that sends data to 100s of partitions continuously 
 for ~15 hours. After ~4 hours of operation, few server threads (acceptor and 
 processor) exited due to OOME -
 [2012-12-07 08:24:44,355] ERROR OOME with size 1700161893 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-acceptor': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-processor-9092-1': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:46,344] INFO Unable to reconnect to ZooKeeper service, 
 session 0x13afd0753870103 has expired, closing socket connection 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:24:46,344] INFO zookeeper state changed (Expired) 
 (org.I0Itec.zkclient.ZkClient)
 [2012-12-07 08:24:46,344] INFO Initiating client connection, 
 connectString=eat1-app309.corp:12913,eat1-app310.corp:12913,eat1-app311.corp:12913,eat1-app312.corp:12913,eat1-app313.corp:12913
  sessionTimeout=15000 watcher=org.I0Itec.zkclient.ZkClient@19202d69 
 (org.apache.zookeeper.ZooKeeper)
 [2012-12-07 08:24:55,702] ERROR OOME with size 2001040997 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:01,192] ERROR Uncaught exception in thread 
 'kafka-request-handler-0': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:08,739] INFO Opening socket connection to server 
 eat1-app311.corp/172.20.72.75:12913 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:14,221] INFO Socket connection established to 
 eat1-app311.corp/172.20.72.75:12913, initiating session 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:17,943] INFO Client session timed out, have not heard from 
 server in 3722ms for sessionid 0x0, closing socket connection and attempting 
 reconnect (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:19,805] ERROR error in loggedRunnable (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:23,528] ERROR OOME with size 1853095936 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 It seems like it runs out of memory while trying to read the producer 
 request, but its unclear so far. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-664) Kafka server threads die due to OOME during long running test

2012-12-14 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-664:
-

Attachment: KAFKA-664-v4.patch

All good points - here is v4 with those changes.

 Kafka server threads die due to OOME during long running test
 -

 Key: KAFKA-664
 URL: https://issues.apache.org/jira/browse/KAFKA-664
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Jay Kreps
Priority: Blocker
  Labels: bugs
 Fix For: 0.8

 Attachments: kafka-664-draft-2.patch, kafka-664-draft.patch, 
 KAFKA-664-v3.patch, KAFKA-664-v4.patch, Screen Shot 2012-12-09 at 11.22.50 
 AM.png, Screen Shot 2012-12-09 at 11.23.09 AM.png, Screen Shot 2012-12-09 at 
 11.31.29 AM.png, thread-dump.log, watchersForKey.png


 I set up a Kafka cluster with 5 brokers (JVM memory 512M) and set up a long 
 running producer process that sends data to 100s of partitions continuously 
 for ~15 hours. After ~4 hours of operation, few server threads (acceptor and 
 processor) exited due to OOME -
 [2012-12-07 08:24:44,355] ERROR OOME with size 1700161893 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-acceptor': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-processor-9092-1': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:46,344] INFO Unable to reconnect to ZooKeeper service, 
 session 0x13afd0753870103 has expired, closing socket connection 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:24:46,344] INFO zookeeper state changed (Expired) 
 (org.I0Itec.zkclient.ZkClient)
 [2012-12-07 08:24:46,344] INFO Initiating client connection, 
 connectString=eat1-app309.corp:12913,eat1-app310.corp:12913,eat1-app311.corp:12913,eat1-app312.corp:12913,eat1-app313.corp:12913
  sessionTimeout=15000 watcher=org.I0Itec.zkclient.ZkClient@19202d69 
 (org.apache.zookeeper.ZooKeeper)
 [2012-12-07 08:24:55,702] ERROR OOME with size 2001040997 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:01,192] ERROR Uncaught exception in thread 
 'kafka-request-handler-0': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:08,739] INFO Opening socket connection to server 
 eat1-app311.corp/172.20.72.75:12913 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:14,221] INFO Socket connection established to 
 eat1-app311.corp/172.20.72.75:12913, initiating session 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:17,943] INFO Client session timed out, have not heard from 
 server in 3722ms for sessionid 0x0, closing socket connection and attempting 
 reconnect (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:19,805] ERROR error in loggedRunnable (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:23,528] ERROR OOME with size 1853095936 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 It seems like it runs out of memory while trying to read the producer 
 request, but its unclear so far. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-598) decouple fetch size from max message size

2012-12-14 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-598:
-

Attachment: KAFKA-598-v3.patch

Quick overview of revised patch:

1 - Addressed your comment about the previous behavior in ConsumerIterator
  (good catch on that!) and the config defaults.
2 - Changed semantics of fetch size to max memory. Max mem is a long (as int
  would currently limit to 2G). The actual partition fetch size is checked
  for overflow (in which case it is set to Int.MaxValue).
3 - Also introduced a DeprecatedProperties convenience class that will be
  checked upon config verification. I added this because i think max.memory
  is a more meaningful config than fetch.size and we can use this to
  deprecate other configs if needed.
4 - The partition count is a volatile int - I chose that over a method only to
  avoid traversal (for each request) to determine the count.


 decouple fetch size from max message size
 -

 Key: KAFKA-598
 URL: https://issues.apache.org/jira/browse/KAFKA-598
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Joel Koshy
Priority: Blocker
 Attachments: KAFKA-598-v1.patch, KAFKA-598-v2.patch, 
 KAFKA-598-v3.patch


 Currently, a consumer has to set fetch size larger than the max message size. 
 This increases the memory footprint on the consumer, especially when a large 
 number of topic/partition is subscribed. By decoupling the fetch size from 
 max message size, we can use a smaller fetch size for normal consumption and 
 when hitting a large message (hopefully rare), we automatically increase 
 fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-664) Kafka server threads die due to OOME during long running test

2012-12-17 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-664.
--

Resolution: Fixed

Committed on 0.8

 Kafka server threads die due to OOME during long running test
 -

 Key: KAFKA-664
 URL: https://issues.apache.org/jira/browse/KAFKA-664
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Jay Kreps
Priority: Blocker
  Labels: bugs
 Fix For: 0.8

 Attachments: kafka-664-draft-2.patch, kafka-664-draft.patch, 
 KAFKA-664-v3.patch, KAFKA-664-v4.patch, Screen Shot 2012-12-09 at 11.22.50 
 AM.png, Screen Shot 2012-12-09 at 11.23.09 AM.png, Screen Shot 2012-12-09 at 
 11.31.29 AM.png, thread-dump.log, watchersForKey.png


 I set up a Kafka cluster with 5 brokers (JVM memory 512M) and set up a long 
 running producer process that sends data to 100s of partitions continuously 
 for ~15 hours. After ~4 hours of operation, few server threads (acceptor and 
 processor) exited due to OOME -
 [2012-12-07 08:24:44,355] ERROR OOME with size 1700161893 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-acceptor': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:44,356] ERROR Uncaught exception in thread 
 'kafka-processor-9092-1': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:24:46,344] INFO Unable to reconnect to ZooKeeper service, 
 session 0x13afd0753870103 has expired, closing socket connection 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:24:46,344] INFO zookeeper state changed (Expired) 
 (org.I0Itec.zkclient.ZkClient)
 [2012-12-07 08:24:46,344] INFO Initiating client connection, 
 connectString=eat1-app309.corp:12913,eat1-app310.corp:12913,eat1-app311.corp:12913,eat1-app312.corp:12913,eat1-app313.corp:12913
  sessionTimeout=15000 watcher=org.I0Itec.zkclient.ZkClient@19202d69 
 (org.apache.zookeeper.ZooKeeper)
 [2012-12-07 08:24:55,702] ERROR OOME with size 2001040997 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:01,192] ERROR Uncaught exception in thread 
 'kafka-request-handler-0': (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:08,739] INFO Opening socket connection to server 
 eat1-app311.corp/172.20.72.75:12913 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:14,221] INFO Socket connection established to 
 eat1-app311.corp/172.20.72.75:12913, initiating session 
 (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:17,943] INFO Client session timed out, have not heard from 
 server in 3722ms for sessionid 0x0, closing socket connection and attempting 
 reconnect (org.apache.zookeeper.ClientCnxn)
 [2012-12-07 08:25:19,805] ERROR error in loggedRunnable (kafka.utils.Utils$)
 java.lang.OutOfMemoryError: Java heap space
 [2012-12-07 08:25:23,528] ERROR OOME with size 1853095936 
 (kafka.network.BoundedByteBufferReceive)
 java.lang.OutOfMemoryError: Java heap space
 It seems like it runs out of memory while trying to read the producer 
 request, but its unclear so far. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-598) decouple fetch size from max message size

2012-12-19 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13536227#comment-13536227
 ] 

Joel Koshy commented on KAFKA-598:
--

Thanks for taking the time to review.

30. I think what I have is correct but may be missing something - can you check 
this again and confirm? The passed-in value is (supposed to be) the fetch size 
per partition. Only the fetcher manager can compute the fetch size per 
partition (since it has access to the partition count for each fetcher thread) 
and that function is wired in through the constructor of the abstract fetcher 
thread.
31. Sure - will make that change.
32. It is true that map size is O(1). However, here I need to take each value 
of the map (which is a fetcher thread) and get the number of partitions that it 
owns and then sum up the counts. (This is required to compute the partition 
fetch size). The iteration over the map values seemed expensive (and it is done 
for each fetch request) so I went with the volatile int.
Anyway, I think a better approach would be to maintain a Set of partitions that 
the fetcher manager is currently managing and just return the size of the set.


 decouple fetch size from max message size
 -

 Key: KAFKA-598
 URL: https://issues.apache.org/jira/browse/KAFKA-598
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Joel Koshy
Priority: Blocker
 Attachments: KAFKA-598-v1.patch, KAFKA-598-v2.patch, 
 KAFKA-598-v3.patch


 Currently, a consumer has to set fetch size larger than the max message size. 
 This increases the memory footprint on the consumer, especially when a large 
 number of topic/partition is subscribed. By decoupling the fetch size from 
 max message size, we can use a smaller fetch size for normal consumption and 
 when hitting a large message (hopefully rare), we automatically increase 
 fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-682) java.lang.OutOfMemoryError: Java heap space

2013-01-04 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13544119#comment-13544119
 ] 

Joel Koshy commented on KAFKA-682:
--

You might need to increase your heap size. What do you have it set to right 
now? Would you be able to run the broker with -XX:+HeapDumpOnOutOfMemoryError 
to get a heap-dump?

In case you are overriding defaults - what's the replication factor for the 
topic, num-required-acks for the producer requests, and producer request 
timeout? Are any requests going through or are the produce requests expiring?


 java.lang.OutOfMemoryError: Java heap space
 ---

 Key: KAFKA-682
 URL: https://issues.apache.org/jira/browse/KAFKA-682
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
 Environment: $ uname -a
 Linux rngadam-think 3.5.0-17-generic #28-Ubuntu SMP Tue Oct 9 19:32:08 UTC 
 2012 i686 i686 i686 GNU/Linux
 $ java -version
 java version 1.7.0_09
 OpenJDK Runtime Environment (IcedTea7 2.3.3) (7u9-2.3.3-0ubuntu1~12.04.1)
 OpenJDK Server VM (build 23.2-b09, mixed mode)
Reporter: Ricky Ng-Adam

 git pull (commit 32dae955d5e2e2dd45bddb628cb07c874241d856)
 ...build...
 ./sbt update
 ./sbt package
 ...run...
 bin/zookeeper-server-start.sh config/zookeeper.properties
 bin/kafka-server-start.sh config/server.properties
 ...then configured fluentd with kafka plugin...
 gem install fluentd --no-ri --no-rdoc
 gem install fluent-plugin-kafka
 fluentd -c ./fluent/fluent.conf -vv
 ...then flood fluentd with messages inputted from syslog and outputted to 
 kafka.
 results in (after about 1 messages of 1K each in 3s):
 [2013-01-05 02:00:52,087] ERROR Closing socket for /127.0.0.1 because of 
 error (kafka.network.Processor)
 java.lang.OutOfMemoryError: Java heap space
 at 
 kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:45)
 at 
 kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:42)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
 at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
 at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
 at scala.collection.immutable.Range.map(Range.scala:39)
 at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:42)
 at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:38)
 at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227)
 at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227)
 at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
 at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274)
 at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:227)
 at scala.collection.immutable.Range.flatMap(Range.scala:39)
 at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:38)
 at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32)
 at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32)
 at kafka.network.RequestChannel$Request.init(RequestChannel.scala:47)
 at kafka.network.Processor.read(SocketServer.scala:298)
 at kafka.network.Processor.run(SocketServer.scala:209)
 at java.lang.Thread.run(Thread.java:722)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-682) java.lang.OutOfMemoryError: Java heap space

2013-01-08 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13547129#comment-13547129
 ] 

Joel Koshy commented on KAFKA-682:
--

That's why I asked  for the configured num-required-acks for the producer 
requests. If it is the default (0) then it shouldn't be added to the request 
purgatory which rule out KAFKA-671 no?

 java.lang.OutOfMemoryError: Java heap space
 ---

 Key: KAFKA-682
 URL: https://issues.apache.org/jira/browse/KAFKA-682
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
 Environment: $ uname -a
 Linux rngadam-think 3.5.0-17-generic #28-Ubuntu SMP Tue Oct 9 19:32:08 UTC 
 2012 i686 i686 i686 GNU/Linux
 $ java -version
 java version 1.7.0_09
 OpenJDK Runtime Environment (IcedTea7 2.3.3) (7u9-2.3.3-0ubuntu1~12.04.1)
 OpenJDK Server VM (build 23.2-b09, mixed mode)
Reporter: Ricky Ng-Adam
 Attachments: java_pid22281.hprof.gz, java_pid22281_Leak_Suspects.zip


 git pull (commit 32dae955d5e2e2dd45bddb628cb07c874241d856)
 ...build...
 ./sbt update
 ./sbt package
 ...run...
 bin/zookeeper-server-start.sh config/zookeeper.properties
 bin/kafka-server-start.sh config/server.properties
 ...then configured fluentd with kafka plugin...
 gem install fluentd --no-ri --no-rdoc
 gem install fluent-plugin-kafka
 fluentd -c ./fluent/fluent.conf -vv
 ...then flood fluentd with messages inputted from syslog and outputted to 
 kafka.
 results in (after about 1 messages of 1K each in 3s):
 [2013-01-05 02:00:52,087] ERROR Closing socket for /127.0.0.1 because of 
 error (kafka.network.Processor)
 java.lang.OutOfMemoryError: Java heap space
 at 
 kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:45)
 at 
 kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:42)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
 at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
 at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
 at scala.collection.immutable.Range.map(Range.scala:39)
 at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:42)
 at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:38)
 at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227)
 at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227)
 at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282)
 at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274)
 at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:227)
 at scala.collection.immutable.Range.flatMap(Range.scala:39)
 at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:38)
 at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32)
 at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:32)
 at kafka.network.RequestChannel$Request.init(RequestChannel.scala:47)
 at kafka.network.Processor.read(SocketServer.scala:298)
 at kafka.network.Processor.run(SocketServer.scala:209)
 at java.lang.Thread.run(Thread.java:722)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-598) decouple fetch size from max message size

2013-01-11 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13551383#comment-13551383
 ] 

Joel Koshy commented on KAFKA-598:
--

The full scope should probably move out of 0.8 - i.e., as described above 
bounding the consumers memory is
basically a packing problem without knowledge of the message-size on the 
broker. One possibility is for the broker
to somehow communicate the size of the large message back to the client, but 
that would break our zero-copy
property wrt fetches.

So I would suggest we don't do the full patch (i.e., bounding consumer memory 
 handling large messages).
Instead we can go with the simpler implementation that requires a new config 
(which is not ideal, but better IMO than
trying to half-implement the above packing problem.).

I haven't had time to look at this lately, but if people are okay with the 
above, then I can revisit one of the
earlier revisions of the patches.


 decouple fetch size from max message size
 -

 Key: KAFKA-598
 URL: https://issues.apache.org/jira/browse/KAFKA-598
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Joel Koshy
Priority: Blocker
 Attachments: KAFKA-598-v1.patch, KAFKA-598-v2.patch, 
 KAFKA-598-v3.patch


 Currently, a consumer has to set fetch size larger than the max message size. 
 This increases the memory footprint on the consumer, especially when a large 
 number of topic/partition is subscribed. By decoupling the fetch size from 
 max message size, we can use a smaller fetch size for normal consumption and 
 when hitting a large message (hopefully rare), we automatically increase 
 fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-01-16 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555352#comment-13555352
 ] 

Joel Koshy commented on KAFKA-705:
--

I set up a local cluster of three brokers and created a bunch of topics, 
replication factor = 2. I was able to do multiple iterations of rolling bounces 
without
issue. Since this was local, I did not use your py script as it kills pid's 
returned by ps.

Would you by any chance be able to provide a scenario to reproduce this 
locally? That said, I believe John Fung also tried to reproduce this in a
distributed environment but was unable to do so; so I'll probably need to take 
a look at logs in your environment.


 Controlled shutdown doesn't seem to work on more than one broker in a cluster
 -

 Key: KAFKA-705
 URL: https://issues.apache.org/jira/browse/KAFKA-705
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Joel Koshy
Priority: Critical
  Labels: bugs
 Attachments: shutdown_brokers_eat.py, shutdown-command


 I wrote a script (attached here) to basically round robin through the brokers 
 in a cluster doing the following 2 operations on each of them -
 1. Send the controlled shutdown admin command. If it succeeds
 2. Restart the broker
 What I've observed is that only one broker is able to finish the above 
 successfully the first time around. For the rest of the iterations, no broker 
 is able to shutdown using the admin command and every single time it fails 
 with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-01-18 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13557481#comment-13557481
 ] 

Joel Koshy commented on KAFKA-705:
--

I think this is why it happens:

https://github.com/apache/kafka/blob/03eb903ce223ab55c5acbcf4243ce805aaaf4fad/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala#L150

It could occur as follows. Suppose there's a partition 'P' assigned to brokers 
x and y; leaderAndIsr = y, {x, y}
1. Controlled shutdown of broker x; leaderAndIsr - y, {y}
2. After above completes, kill -15 and then restart broker x
3. Immediately do a controlled shutdown of broker y; so now y is in the list of 
shutting down brokers.

Due to the above, x will not start its follower to 'P' on broker y.

Adding sufficient wait time between (2) and (3) seems to address the issue (in 
your script there's no sleep), but we should handle it properly in the shutdown 
code.
Will think about a fix for that.


 Controlled shutdown doesn't seem to work on more than one broker in a cluster
 -

 Key: KAFKA-705
 URL: https://issues.apache.org/jira/browse/KAFKA-705
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Joel Koshy
Priority: Critical
  Labels: bugs
 Attachments: shutdown_brokers_eat.py, shutdown-command


 I wrote a script (attached here) to basically round robin through the brokers 
 in a cluster doing the following 2 operations on each of them -
 1. Send the controlled shutdown admin command. If it succeeds
 2. Restart the broker
 What I've observed is that only one broker is able to finish the above 
 successfully the first time around. For the rest of the iterations, no broker 
 is able to shutdown using the admin command and every single time it fails 
 with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-712) Controlled shutdown tool should provide a meaningful message if a controller failover occurs during the operation

2013-01-18 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-712:


 Summary: Controlled shutdown tool should provide a meaningful 
message if a controller failover occurs during the operation
 Key: KAFKA-712
 URL: https://issues.apache.org/jira/browse/KAFKA-712
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8, 0.8.1
Reporter: Joel Koshy
Priority: Minor
 Fix For: 0.8.1


If the controller fails over before a jmx connection can be established, the 
tool shows the following
exception:
javax.management.InstanceNotFoundException: 
kafka.controller:type=KafkaController,name=ControllerOps
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBean(DefaultMBeanServerInterceptor.java:1094)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getClassLoaderFor(DefaultMBeanServerInterceptor.java:1438)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.getClassLoaderFor(JmxMBeanServer.java:1276)
at 
javax.management.remote.rmi.RMIConnectionImpl$5.run(RMIConnectionImpl.java:1326)
at java.security.AccessController.doPrivileged(Native Method)
at 
javax.management.remote.rmi.RMIConnectionImpl.getClassLoaderFor(RMIConnectionImpl.java:1323)
at 
javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:771)
at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:305)
at sun.rmi.transport.Transport$1.run(Transport.java:159)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.Transport.serviceCall(Transport.java:155)
at 
sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:535)
at 
sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:790)
at 
sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:649)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
at 
sun.rmi.transport.StreamRemoteCall.exceptionReceivedFromServer(StreamRemoteCall.java:255)
at 
sun.rmi.transport.StreamRemoteCall.executeCall(StreamRemoteCall.java:233)
at sun.rmi.server.UnicastRef.invoke(UnicastRef.java:142)
at com.sun.jmx.remote.internal.PRef.invoke(Unknown Source)
at javax.management.remote.rmi.RMIConnectionImpl_Stub.invoke(Unknown 
Source)
at 
javax.management.remote.rmi.RMIConnector$RemoteMBeanServerConnection.invoke(RMIConnector.java:993)
at 
kafka.admin.ShutdownBroker$.kafka$admin$ShutdownBroker$$invokeShutdown(ShutdownBroker.scala:50)
at kafka.admin.ShutdownBroker$.main(ShutdownBroker.scala:105)
at kafka.admin.ShutdownBroker.main(ShutdownBroker.scala)

Using the retry option on the tool would work, but we should provide a more
meaningful message.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-01-21 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13559121#comment-13559121
 ] 

Joel Koshy commented on KAFKA-705:
--

I committed the fix to 0.8 with a small edit: used the 
liveOrShuttingDownBrokers field.

Another small issue is that we send a stop replica fetchers to the shutting 
down broker even if
controlled shutdown did not complete. This prematurely forces the broker out 
of the ISR of those
partitions. I think it should be safe to avoid sending the stop replica request 
if controlled shutdown
has not completely moved leadership of partitions off the shutting down broker.


 Controlled shutdown doesn't seem to work on more than one broker in a cluster
 -

 Key: KAFKA-705
 URL: https://issues.apache.org/jira/browse/KAFKA-705
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Joel Koshy
Priority: Critical
  Labels: bugs
 Attachments: kafka-705-v1.patch, shutdown_brokers_eat.py, 
 shutdown-command


 I wrote a script (attached here) to basically round robin through the brokers 
 in a cluster doing the following 2 operations on each of them -
 1. Send the controlled shutdown admin command. If it succeeds
 2. Restart the broker
 What I've observed is that only one broker is able to finish the above 
 successfully the first time around. For the rest of the iterations, no broker 
 is able to shutdown using the admin command and every single time it fails 
 with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-01-21 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-705:
-

Attachment: kafka-705-incremental-v2.patch

Here is what I meant in my last comment.

 Controlled shutdown doesn't seem to work on more than one broker in a cluster
 -

 Key: KAFKA-705
 URL: https://issues.apache.org/jira/browse/KAFKA-705
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Joel Koshy
Priority: Critical
  Labels: bugs
 Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, 
 shutdown_brokers_eat.py, shutdown-command


 I wrote a script (attached here) to basically round robin through the brokers 
 in a cluster doing the following 2 operations on each of them -
 1. Send the controlled shutdown admin command. If it succeeds
 2. Restart the broker
 What I've observed is that only one broker is able to finish the above 
 successfully the first time around. For the rest of the iterations, no broker 
 is able to shutdown using the admin command and every single time it fails 
 with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-817) Implement a zookeeper path-based controlled shutdown tool

2013-03-19 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-817:


 Summary: Implement a zookeeper path-based controlled shutdown tool
 Key: KAFKA-817
 URL: https://issues.apache.org/jira/browse/KAFKA-817
 Project: Kafka
  Issue Type: Bug
  Components: controller, tools
Affects Versions: 0.8
Reporter: Joel Koshy
Assignee: Neha Narkhede


The controlled shutdown tool currently depends on jmxremote.port being exposed. 
Apparently, this is often not exposed in production environments and makes the 
script unusable. We can move to a zk-based approach in which the controller 
watches a path that lists shutting down brokers. This will also make it 
consistent with the pattern used in some of the other replication-related tools.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-827) improve list topic output format

2013-03-26 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13614434#comment-13614434
 ] 

Joel Koshy commented on KAFKA-827:
--

While you are touching this, would it be reasonable to also switch from using 
the AdminUtil to a blank TopicMetadataRequest? It runs a lot quicker if there 
are a large number of topics and you run the tool from outside the ZK cluster's 
DC. Also, the topicOpt description has been misleading for a while.

 improve list topic output format
 

 Key: KAFKA-827
 URL: https://issues.apache.org/jira/browse/KAFKA-827
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Jun Rao
Priority: Blocker
 Attachments: kafka-827.patch


 We need to make the output of list topic command more readable.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-826) Make Kafka 0.8 depend on metrics 2.2.0 instead of 3.x

2013-03-28 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13616783#comment-13616783
 ] 

Joel Koshy commented on KAFKA-826:
--

Thank you for looking into this. Metrics 2.x had a few minor issues with the 
CsvReporter (which we use in the system tests) and this is why we
used 3.x.

The fixes that I'm aware of are:
- https://github.com/codahale/metrics/pull/225
- https://github.com/codahale/metrics/pull/290
- If a CSV file already exists, metrics throws an IOException and does not 
resume CSV reporting. This would be the case on a broker bounce for example. 
Someone put out a patch for this 
(https://github.com/adagios/metrics/compare/2.x-maintenance...2.x-epoch-in-csv) 
but I'd have to check if that was pulled into metrics-3.x

Unfortunately, although the above are small fixes, if we want to use the 
official 2.x metrics release we would need to copy over
the code of the metrics CsvReporter (i.e., into a new implementation of 
metrics' AbstractReporter), patch in those fixes and plug
that into KafkaMetricsCsvReporter. I don't think it is difficult, but a bit 
clunky (which is why at the time we preferred using 3.x).


 Make Kafka 0.8 depend on metrics 2.2.0 instead of 3.x
 -

 Key: KAFKA-826
 URL: https://issues.apache.org/jira/browse/KAFKA-826
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Jun Rao
Priority: Blocker
  Labels: build, kafka-0.8, metrics

 In order to mavenize Kafka 0.8, we have to depend on metrics 2.2.0 since 
 metrics 3.x is a huge change as well as not an officially supported release.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-872) Socket server does not set send/recv buffer sizes

2013-04-23 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-872:


 Summary: Socket server does not set send/recv buffer sizes
 Key: KAFKA-872
 URL: https://issues.apache.org/jira/browse/KAFKA-872
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Joel Koshy
Assignee: Joel Koshy
 Fix For: 0.8


The socket server should set its send and receive socket buffer sizes - this is 
important in cross-DC mirroring setups where large buffer sizes are essential 
to enable the mirror-maker processes to do bulk consumption. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-872) Socket server does not set send/recv buffer sizes

2013-04-23 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-872:
-

Attachment: KAFKA-872-v1.patch

 Socket server does not set send/recv buffer sizes
 -

 Key: KAFKA-872
 URL: https://issues.apache.org/jira/browse/KAFKA-872
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Joel Koshy
Assignee: Joel Koshy
 Fix For: 0.8

 Attachments: KAFKA-872-v1.patch


 The socket server should set its send and receive socket buffer sizes - this 
 is important in cross-DC mirroring setups where large buffer sizes are 
 essential to enable the mirror-maker processes to do bulk consumption. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-880) NoLeaderPartitionSet should be cleared before leader finder thread is started up

2013-04-25 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-880:


 Summary: NoLeaderPartitionSet should be cleared before leader 
finder thread is started up
 Key: KAFKA-880
 URL: https://issues.apache.org/jira/browse/KAFKA-880
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


This was a recent regression.

This could prevent the consumer from progressing because fetchers for the 
currently owned partitions may not be added (depending on the order that the 
map iterator yields).

I think the fix should be simple - just clear the set after stopping the leader 
finder thread and stopping fetchers.

{code}
[2013-04-25 17:06:38,377] WARN 
[sometopic-somehost-1366909575615-f801367d-leader-finder-thread]
, Failed to find leader for Set([sometopic,11], [sometopic,25], [sometopic,24]) 
(kafka.consumer.ConsumerFetcherManager$Lead
erFinderThread)
java.util.NoSuchElementException: key not found: [sometopic,24]
at scala.collection.MapLike$class.default(MapLike.scala:223)
at scala.collection.immutable.Map$Map2.default(Map.scala:110)
at scala.collection.MapLike$class.apply(MapLike.scala:134)
at scala.collection.immutable.Map$Map2.apply(Map.scala:110)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:81)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
{code}


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-880) NoLeaderPartitionSet should be cleared before leader finder thread is started up

2013-04-25 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-880:
-

Description: 
This was a recent regression.

This could prevent the consumer from progressing because fetchers for the 
currently owned partitions may not be added (depending on the order that the 
map iterator yields).

I think the fix should be simple - just clear the set after stopping the leader 
finder thread and stopping fetchers.

[2013-04-25 17:06:38,377] WARN 
[sometopic-somehost-1366909575615-f801367d-leader-finder-thread]
, Failed to find leader for Set([sometopic,11], [sometopic,25], [sometopic,24]) 
(kafka.consumer.ConsumerFetcherManager$Lead
erFinderThread)
java.util.NoSuchElementException: key not found: [sometopic,24]
at scala.collection.MapLike$class.default(MapLike.scala:223)
at scala.collection.immutable.Map$Map2.default(Map.scala:110)
at scala.collection.MapLike$class.apply(MapLike.scala:134)
at scala.collection.immutable.Map$Map2.apply(Map.scala:110)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:81)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)


  was:
This was a recent regression.

This could prevent the consumer from progressing because fetchers for the 
currently owned partitions may not be added (depending on the order that the 
map iterator yields).

I think the fix should be simple - just clear the set after stopping the leader 
finder thread and stopping fetchers.

{code}
[2013-04-25 17:06:38,377] WARN 
[sometopic-somehost-1366909575615-f801367d-leader-finder-thread]
, Failed to find leader for Set([sometopic,11], [sometopic,25], [sometopic,24]) 
(kafka.consumer.ConsumerFetcherManager$Lead
erFinderThread)
java.util.NoSuchElementException: key not found: [sometopic,24]
at scala.collection.MapLike$class.default(MapLike.scala:223)
at scala.collection.immutable.Map$Map2.default(Map.scala:110)
at scala.collection.MapLike$class.apply(MapLike.scala:134)
at scala.collection.immutable.Map$Map2.apply(Map.scala:110)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:81)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
{code}



 NoLeaderPartitionSet should be cleared before leader finder thread is started 
 up
 

 Key: KAFKA-880
 URL: https://issues.apache.org/jira/browse/KAFKA-880
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


 This was a recent regression.
 This could prevent the consumer from progressing because fetchers for the 
 currently owned partitions may not be added (depending on the order that the 
 map iterator yields).
 I think the fix should be simple - just clear the set after stopping the 
 leader finder thread and stopping fetchers.
 [2013-04-25 17:06:38,377] WARN 
 [sometopic-somehost-1366909575615-f801367d-leader-finder-thread]
 , Failed to find leader for Set([sometopic,11], [sometopic,25], 
 [sometopic,24]) 

[jira] [Commented] (KAFKA-890) The list of brokers for fetching metadata should be shuffled

2013-04-30 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13645736#comment-13645736
 ] 

Joel Koshy commented on KAFKA-890:
--

+1

It is worth noting that this is useful even in the presence of a VIP since the 
consumers don't currently use a VIP to to look up metadata.

 The list of brokers for fetching metadata should be shuffled 
 -

 Key: KAFKA-890
 URL: https://issues.apache.org/jira/browse/KAFKA-890
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Neha Narkhede
Priority: Blocker
  Labels: kafka-0.8, p1
 Attachments: kafka-890.patch


 The list of brokers in the metadata request is never shuffled. Which means 
 that if some clients are not using a VIP for metadata requests, the first 
 broker ends up servicing most metadata requests, leaving imbalanced load on 
 the brokers. This issue is even more pronounced when there are several 
 thousand clients talking to a cluster each using a broker list to fetch 
 metadata.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-886) Update info on Controlled shutdown and Preferred replica election tool

2013-04-30 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13646092#comment-13646092
 ] 

Joel Koshy commented on KAFKA-886:
--

Thanks for the great write-up. Couple of comments:

1) We should probably add a note on the controlled shutdown tool (script) usage 
that it is currently JMX-based and depends on the jmx.remote.port property 
being set (otherwise you won't be be able to use the script and will need to 
poke jmx through other means). We can reference KAFKA-817 which will remedy 
this and make it zookeeper-based instead of JMX.
2) Due to the above, in case people need to use local JMX operations and 
essentially do manually what the script does automatically then it is best to 
do a controlled shutdown and bounce of the controller last (as otherwise there 
would be unnecessary controller re-elections).
3) For the ListTopicCommand tool - maybe we should mention that if there are a 
lot of topics and we list info for all topics it can take a while to run unless 
it is in the same datacenter as the ZK cluster. Actually I think the 
ListTopicCommand should really be using the SimpleConsumer or producer to fetch 
metadata instead of reading ZK directly. That way, people don't have to zip up 
Kafka and copy it over to their production environment. What do you think?

 Update info on Controlled shutdown and Preferred replica election tool
 --

 Key: KAFKA-886
 URL: https://issues.apache.org/jira/browse/KAFKA-886
 Project: Kafka
  Issue Type: Sub-task
Affects Versions: 0.8
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
Priority: Blocker
  Labels: p1



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-901) Kafka server can become unavailable if clients send several metadata requests

2013-05-10 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13654667#comment-13654667
 ] 

Joel Koshy commented on KAFKA-901:
--

Haven't looked at the patch yet, but went through the overview. An alternate 
approach that we may want to consider is to maintain a metadata cache at every 
broker. The cache can be kept consistent by having the controller send a (new) 
update-metadata request to all brokers whenever it sends out a leaderAndIsr 
request. A new request type would avoid needing to overload the leader and 
isr request.

This would help avoid the herd effect of multiple clients flooding the 
controller with metadata requests (although these requests should return 
quickly with your patch).


 Kafka server can become unavailable if clients send several metadata requests
 -

 Key: KAFKA-901
 URL: https://issues.apache.org/jira/browse/KAFKA-901
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Neha Narkhede
Priority: Blocker
 Attachments: metadata-request-improvement.patch


 Currently, if a broker is bounced without controlled shutdown and there are 
 several clients talking to the Kafka cluster, each of the clients realize the 
 unavailability of leaders for some partitions. This leads to several metadata 
 requests sent to the Kafka brokers. Since metadata requests are pretty slow, 
 all the I/O threads quickly become busy serving the metadata requests. This 
 leads to a full request queue, that stalls handling of finished responses 
 since the same network thread handles requests as well as responses. In this 
 situation, clients timeout on metadata requests and send more metadata 
 requests. This quickly makes the Kafka cluster unavailable. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-21 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-914:


 Summary: Deadlock between initial rebalance and watcher-triggered 
rebalances
 Key: KAFKA-914
 URL: https://issues.apache.org/jira/browse/KAFKA-914
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


Summary doesn't give the full picture and the fetcher-manager/fetcher-thread


code is very complex so it's a bit hard to articulate the following very


clearly. I will try and describe the sequence that results in a deadlock


when starting up a large number of consumers at around the same time:   





- When a consumer's createMessageStream method is called, it initiates an   


  initial inline rebalance. 


- However, before the above initial rebalance actually begins, a ZK watch   


  may trigger (due to some other consumers starting up) and initiate a  


  rebalance. This happens successfully so fetchers start and start filling  


  up the chunk queues.  


- Another watch triggers and initiates yet another rebalance. This rebalance


  attempt tries to close the fetchers. Before the fetchers are stopped, we  


  shutdown the leader-finder-thread to prevent new fetchers from being  


  started.  


- The shutdown is accomplished by interrupting the leader-finder-thread and 


  then awaiting its shutdown latch. 


- If the leader-finder-thread still has a partition without leader to   


  process and tries to add a fetcher for it, it will get an exception   


  (InterruptedException if acquiring the partitionMapLock or


  ClosedByInterruptException if performing an offset request). If we get an 


  InterruptedException the thread's interrupted flag is cleared.


- However, the leader-finder-thread may have 

[jira] [Commented] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-21 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13663146#comment-13663146
 ] 

Joel Koshy commented on KAFKA-914:
--

One more point: [td3] above does not need to originate from a watcher-triggered 
rebalance. The initial rebalance can also run into the same deadlock. i.e., as 
long as one or more watcher-triggered rebalances succeed and start fetchers 
prior to the initial rebalance, we may end up in this wedged state. E.g., on 
another instance I saw [td3] but on the main thread:

2013-05-21_17:07:14.34308 main prio=10 tid=0x7f5e34008000 nid=0x4e49 
waiting on condition [0x7f5e3b41]
2013-05-21_17:07:14.34308java.lang.Thread.State: WAITING (parking)
2013-05-21_17:07:14.34309   at sun.misc.Unsafe.park(Native Method)
2013-05-21_17:07:14.34309   - parking to wait for  0x7f5d36d99fa0 (a 
java.util.concurrent.CountDownLatch$Sync)
2013-05-21_17:07:14.34309   at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-21_17:07:14.34310   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
2013-05-21_17:07:14.34310   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
2013-05-21_17:07:14.34310   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
2013-05-21_17:07:14.34311   at 
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
2013-05-21_17:07:14.34312   at 
kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
2013-05-21_17:07:14.34313   at 
kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125)
2013-05-21_17:07:14.34313   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo
nnector.scala:486)
2013-05-21_17:07:14.34313   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523)
2013-05-21_17:07:14.34314   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala
:420)
2013-05-21_17:07:14.34314   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373)
2013-05-21_17:07:14.34315   at 
scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
2013-05-21_17:07:14.34315   at 
scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
2013-05-21_17:07:14.34316   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
2013-05-21_17:07:14.34316   - locked 0x7f5d36d4b2e0 (a 
java.lang.Object)
2013-05-21_17:07:14.34317   at 
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678)
2013-05-21_17:07:14.34317   at 
kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.init(ZookeeperConsumerConnector.scala:712)
2013-05-21_17:07:14.34318   at 
kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
2013-05-21_17:07:14.34318   at 
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-21_17:07:14.34318   at 
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-21_17:07:14.34319   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-21_17:07:14.34319   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-21_17:07:14.34319   at 
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
2013-05-21_17:07:14.34320   at 
scala.collection.immutable.List.foreach(List.scala:45)
2013-05-21_17:07:14.34320   at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
2013-05-21_17:07:14.34320   at 
scala.collection.immutable.List.map(List.scala:45)
2013-05-21_17:07:14.34321   at 
kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
2013-05-21_17:07:14.34322   at 
kafka.tools.MirrorMaker.main(MirrorMaker.scala)


 Deadlock between initial rebalance and watcher-triggered rebalances
 ---

 Key: KAFKA-914
 URL: https://issues.apache.org/jira/browse/KAFKA-914
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


 Summary 

[jira] [Updated] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-21 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-914:
-

Attachment: KAFKA-914-v1.patch

Patch with the mentioned fix.

1 - I added comments with some detail since the manager/fetcher/connector 
interaction is very tricky.
2 - Passing through throwables while shutting down. The isRunning check is 
probably unnecessary, but safer to keep.
3 - Made the following changes to the mirrormaker - I can put that in a 
separate jira as well.
  a - Currently if no streams are created, the mirrormaker doesn't quit. 
Setting streams to empty/nil fixes that issue.
  b - If a consumer-side exception (e.g., iterator timeout) gets thrown the 
mirror-maker does not exit. Addressed this by awaiting on the consumer threads 
at the end of the main method.



 Deadlock between initial rebalance and watcher-triggered rebalances
 ---

 Key: KAFKA-914
 URL: https://issues.apache.org/jira/browse/KAFKA-914
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8

 Attachments: KAFKA-914-v1.patch


 Summary doesn't give the full picture and the fetcher-manager/fetcher-thread  
   
 
 code is very complex so it's a bit hard to articulate the following very  
   
 
 clearly. I will try and describe the sequence that results in a deadlock  
   
 
 when starting up a large number of consumers at around the same time: 
   
 
   
   
 
 - When a consumer's createMessageStream method is called, it initiates an 
   
 
   initial inline rebalance.   
   
 
 - However, before the above initial rebalance actually begins, a ZK watch 
   
 
   may trigger (due to some other consumers starting up) and initiate a
   
 
   rebalance. This happens successfully so fetchers start and start filling
   
 
   up the chunk queues.
   
 
 - Another watch triggers and initiates yet another rebalance. This rebalance  
   
 
   attempt tries to close the fetchers. Before the fetchers are stopped, we
   
 
   shutdown the leader-finder-thread to prevent new fetchers from being
   
 
   started.
   
 
 - The shutdown is accomplished by interrupting the leader-finder-thread and   
   
 
   then awaiting its shutdown latch.   
   
 
 - If the leader-finder-thread still has a partition without leader to 
  

[jira] [Created] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error

2013-05-22 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-916:


 Summary: Deadlock between fetcher shutdown and handling partitions 
with error
 Key: KAFKA-916
 URL: https://issues.apache.org/jira/browse/KAFKA-916
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


Here is another consumer deadlock that we encountered. All consumers are
vulnerable to this during a rebalance if there happen to be partitions in
error.

On a rebalance, the fetcher manager closes all fetchers and this holds on to
the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
While the fetcher manager is iterating over fetchers to stop them, a fetcher
that is yet to be stopped hits an error on a partition and proceeds to
handle partitions with error [t2]. This handling involves looking up the
fetcher for that partition and then removing it from the fetcher's set of
partitions to consume. This requires grabbing the same map lock in [t1],
hence the deadlock.

[t1]
2013-05-22_20:23:11.95767 main prio=10 tid=0x7f1b24007800 nid=0x573b 
waiting on condition [0x7f1b2bd38000]
2013-05-22_20:23:11.95767java.lang.Thread.State: WAITING (parking)
2013-05-22_20:23:11.95767   at sun.misc.Unsafe.park(Native Method)
2013-05-22_20:23:11.95767   - parking to wait for  0x7f1a25780598 (a 
java.util.concurrent.CountDownLatch$Sync)
2013-05-22_20:23:11.95767   at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-22_20:23:11.95767   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
2013-05-22_20:23:11.95768   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
2013-05-22_20:23:11.95768   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
2013-05-22_20:23:11.95768   at 
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
2013-05-22_20:23:11.95768   at 
kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
2013-05-22_20:23:11.95769   at 
kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
2013-05-22_20:23:11.95769   at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
2013-05-22_20:23:11.95769   at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
2013-05-22_20:23:11.95769   at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
2013-05-22_20:23:11.95769   at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
2013-05-22_20:23:11.95770   at 
scala.collection.Iterator$class.foreach(Iterator.scala:631)
2013-05-22_20:23:11.95770   at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
2013-05-22_20:23:11.95770   at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
2013-05-22_20:23:11.95770   at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
2013-05-22_20:23:11.95771   at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
2013-05-22_20:23:11.95771   at 
kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
--- 2013-05-22_20:23:11.95771  - locked 0x7f1a2ae92510 (a 
java.lang.Object)
2013-05-22_20:23:11.95771   at 
kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
2013-05-22_20:23:11.95771   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
2013-05-22_20:23:11.95772   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
2013-05-22_20:23:11.95772   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
2013-05-22_20:23:11.95772   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
2013-05-22_20:23:11.95772   at 
scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
2013-05-22_20:23:11.95773   at 
scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
2013-05-22_20:23:11.95773   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369)
2013-05-22_20:23:11.95773   - locked 0x7f1a2a29b450 (a 
java.lang.Object)
2013-05-22_20:23:11.95773   at 

[jira] [Resolved] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-22 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-914.
--

Resolution: Fixed

Thanks for the review. Committed after removing the unnecessary assignment in 
MirrorMaker.

 Deadlock between initial rebalance and watcher-triggered rebalances
 ---

 Key: KAFKA-914
 URL: https://issues.apache.org/jira/browse/KAFKA-914
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8

 Attachments: KAFKA-914-v1.patch


 Summary doesn't give the full picture and the fetcher-manager/fetcher-thread  
   
 
 code is very complex so it's a bit hard to articulate the following very  
   
 
 clearly. I will try and describe the sequence that results in a deadlock  
   
 
 when starting up a large number of consumers at around the same time: 
   
 
   
   
 
 - When a consumer's createMessageStream method is called, it initiates an 
   
 
   initial inline rebalance.   
   
 
 - However, before the above initial rebalance actually begins, a ZK watch 
   
 
   may trigger (due to some other consumers starting up) and initiate a
   
 
   rebalance. This happens successfully so fetchers start and start filling
   
 
   up the chunk queues.
   
 
 - Another watch triggers and initiates yet another rebalance. This rebalance  
   
 
   attempt tries to close the fetchers. Before the fetchers are stopped, we
   
 
   shutdown the leader-finder-thread to prevent new fetchers from being
   
 
   started.
   
 
 - The shutdown is accomplished by interrupting the leader-finder-thread and   
   
 
   then awaiting its shutdown latch.   
   
 
 - If the leader-finder-thread still has a partition without leader to 
   
 
   process and tries to add a fetcher for it, it will get an exception 
   
 
   (InterruptedException if acquiring the partitionMapLock or  
   
 
   ClosedByInterruptException if performing an 

[jira] [Closed] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-22 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-914.



 Deadlock between initial rebalance and watcher-triggered rebalances
 ---

 Key: KAFKA-914
 URL: https://issues.apache.org/jira/browse/KAFKA-914
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8

 Attachments: KAFKA-914-v1.patch


 Summary doesn't give the full picture and the fetcher-manager/fetcher-thread  
   
 
 code is very complex so it's a bit hard to articulate the following very  
   
 
 clearly. I will try and describe the sequence that results in a deadlock  
   
 
 when starting up a large number of consumers at around the same time: 
   
 
   
   
 
 - When a consumer's createMessageStream method is called, it initiates an 
   
 
   initial inline rebalance.   
   
 
 - However, before the above initial rebalance actually begins, a ZK watch 
   
 
   may trigger (due to some other consumers starting up) and initiate a
   
 
   rebalance. This happens successfully so fetchers start and start filling
   
 
   up the chunk queues.
   
 
 - Another watch triggers and initiates yet another rebalance. This rebalance  
   
 
   attempt tries to close the fetchers. Before the fetchers are stopped, we
   
 
   shutdown the leader-finder-thread to prevent new fetchers from being
   
 
   started.
   
 
 - The shutdown is accomplished by interrupting the leader-finder-thread and   
   
 
   then awaiting its shutdown latch.   
   
 
 - If the leader-finder-thread still has a partition without leader to 
   
 
   process and tries to add a fetcher for it, it will get an exception 
   
 
   (InterruptedException if acquiring the partitionMapLock or  
   
 
   ClosedByInterruptException if performing an offset request). If we get an   
   

[jira] [Commented] (KAFKA-911) Bug in controlled shutdown logic in controller leads to controller not sending out some state change request

2013-05-24 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13666532#comment-13666532
 ] 

Joel Koshy commented on KAFKA-911:
--

I had to revisit the notes from KAFKA-340. I think this was touched upon. i.e., 
the fact that the current implementation's attempt to shrink ISR may be 
ineffective for partitions whose leadership has been moved from the current 
broker - 
https://issues.apache.org/jira/browse/KAFKA-340?focusedCommentId=13483478page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13483478

quote
 3.4 What is the point of sending leader and isr request at the end of 
 shutdownBroker, since the OfflineReplica state 
 change would've taken care of that anyway. It seems like you just need to 
 send the stop replica request with the delete 
 partitions flag turned off, no ? 

I still need (as an optimization) to send the leader and isr request to the 
leaders of all partitions that are present 
on the shutting down broker so it can remove the shutting down broker from its 
inSyncReplicas cache 
(in Partition.scala) so it no longer waits for acks from the shutting down 
broker if a producer request's num-acks is 
set to -1. Otherwise, we have to wait for the leader to organically shrink 
the ISR. 

This also applies to partitions which are moved (i.e., partitions for which the 
shutting down broker was the leader): 
the ControlledShutdownLeaderSelector needs to send the updated leaderAndIsr 
request to the shutting down broker as well 
(to tell it that it is no longer the leader) at which point it will start up a 
replica fetcher and re-enter the ISR. 
So in fact, there is actually not much point in removing the current leader 
from the ISR in the 
ControlledShutdownLeaderSelector.selectLeader. 
/quote

and 

https://issues.apache.org/jira/browse/KAFKA-340?focusedCommentId=13484727page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13484727
(I don't think I actually filed that jira though.)


 Bug in controlled shutdown logic in controller leads to controller not 
 sending out some state change request 
 -

 Key: KAFKA-911
 URL: https://issues.apache.org/jira/browse/KAFKA-911
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Neha Narkhede
Priority: Blocker
  Labels: kafka-0.8, p1
 Attachments: kafka-911-v1.patch


 The controlled shutdown logic in the controller first tries to move the 
 leaders from the broker being shutdown. Then it tries to remove the broker 
 from the isr list. During that operation, it does not synchronize on the 
 controllerLock. This causes a race condition while dispatching data using the 
 controller's channel manager.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error

2013-05-24 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-916:
-

Attachment: KAFKA-916-v1.patch

Agreed - I think that should fix the issue.

 Deadlock between fetcher shutdown and handling partitions with error
 

 Key: KAFKA-916
 URL: https://issues.apache.org/jira/browse/KAFKA-916
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8

 Attachments: KAFKA-916-v1.patch


 Here is another consumer deadlock that we encountered. All consumers are
 vulnerable to this during a rebalance if there happen to be partitions in
 error.
 On a rebalance, the fetcher manager closes all fetchers and this holds on to
 the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
 While the fetcher manager is iterating over fetchers to stop them, a fetcher
 that is yet to be stopped hits an error on a partition and proceeds to
 handle partitions with error [t2]. This handling involves looking up the
 fetcher for that partition and then removing it from the fetcher's set of
 partitions to consume. This requires grabbing the same map lock in [t1],
 hence the deadlock.
 [t1]
 2013-05-22_20:23:11.95767 main prio=10 tid=0x7f1b24007800 nid=0x573b 
 waiting on condition [0x7f1b2bd38000]
 2013-05-22_20:23:11.95767java.lang.Thread.State: WAITING (parking)
 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method)
 2013-05-22_20:23:11.95767 - parking to wait for  0x7f1a25780598 (a 
 java.util.concurrent.CountDownLatch$Sync)
 2013-05-22_20:23:11.95767 at 
 java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
 2013-05-22_20:23:11.95767 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
 2013-05-22_20:23:11.95768 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
 2013-05-22_20:23:11.95768 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
 2013-05-22_20:23:11.95768 at 
 java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
 2013-05-22_20:23:11.95768 at 
 kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 2013-05-22_20:23:11.95769 at 
 kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
 2013-05-22_20:23:11.95769 at 
 kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
 2013-05-22_20:23:11.95769 at 
 kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
 2013-05-22_20:23:11.95769 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
 2013-05-22_20:23:11.95769 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
 2013-05-22_20:23:11.95770 at 
 scala.collection.Iterator$class.foreach(Iterator.scala:631)
 2013-05-22_20:23:11.95770 at 
 scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
 2013-05-22_20:23:11.95770 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
 2013-05-22_20:23:11.95770 at 
 scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 2013-05-22_20:23:11.95771 at 
 scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
 2013-05-22_20:23:11.95771 at 
 kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
 --- 2013-05-22_20:23:11.95771- locked 0x7f1a2ae92510 (a 
 java.lang.Object)
 2013-05-22_20:23:11.95771 at 
 kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
 2013-05-22_20:23:11.95771 at 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
 2013-05-22_20:23:11.95772 at 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
 2013-05-22_20:23:11.95772 at 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
 2013-05-22_20:23:11.95772 at 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
 2013-05-22_20:23:11.95772 at 
 scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
 2013-05-22_20:23:11.95773 at 
 

[jira] [Closed] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error

2013-05-28 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-916.



Thanks for the review. Committed to 0.8

 Deadlock between fetcher shutdown and handling partitions with error
 

 Key: KAFKA-916
 URL: https://issues.apache.org/jira/browse/KAFKA-916
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8

 Attachments: KAFKA-916-v1.patch


 Here is another consumer deadlock that we encountered. All consumers are
 vulnerable to this during a rebalance if there happen to be partitions in
 error.
 On a rebalance, the fetcher manager closes all fetchers and this holds on to
 the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
 While the fetcher manager is iterating over fetchers to stop them, a fetcher
 that is yet to be stopped hits an error on a partition and proceeds to
 handle partitions with error [t2]. This handling involves looking up the
 fetcher for that partition and then removing it from the fetcher's set of
 partitions to consume. This requires grabbing the same map lock in [t1],
 hence the deadlock.
 [t1]
 2013-05-22_20:23:11.95767 main prio=10 tid=0x7f1b24007800 nid=0x573b 
 waiting on condition [0x7f1b2bd38000]
 2013-05-22_20:23:11.95767java.lang.Thread.State: WAITING (parking)
 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method)
 2013-05-22_20:23:11.95767 - parking to wait for  0x7f1a25780598 (a 
 java.util.concurrent.CountDownLatch$Sync)
 2013-05-22_20:23:11.95767 at 
 java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
 2013-05-22_20:23:11.95767 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
 2013-05-22_20:23:11.95768 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
 2013-05-22_20:23:11.95768 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
 2013-05-22_20:23:11.95768 at 
 java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
 2013-05-22_20:23:11.95768 at 
 kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 2013-05-22_20:23:11.95769 at 
 kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
 2013-05-22_20:23:11.95769 at 
 kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
 2013-05-22_20:23:11.95769 at 
 kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
 2013-05-22_20:23:11.95769 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
 2013-05-22_20:23:11.95769 at 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
 2013-05-22_20:23:11.95770 at 
 scala.collection.Iterator$class.foreach(Iterator.scala:631)
 2013-05-22_20:23:11.95770 at 
 scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
 2013-05-22_20:23:11.95770 at 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
 2013-05-22_20:23:11.95770 at 
 scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 2013-05-22_20:23:11.95771 at 
 scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
 2013-05-22_20:23:11.95771 at 
 kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
 --- 2013-05-22_20:23:11.95771- locked 0x7f1a2ae92510 (a 
 java.lang.Object)
 2013-05-22_20:23:11.95771 at 
 kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
 2013-05-22_20:23:11.95771 at 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
 2013-05-22_20:23:11.95772 at 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
 2013-05-22_20:23:11.95772 at 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
 2013-05-22_20:23:11.95772 at 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
 2013-05-22_20:23:11.95772 at 
 scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
 2013-05-22_20:23:11.95773 at 
 scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
 

[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers

2013-05-28 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-921:
-

Attachment: KAFKA-921-v1.patch

This provides a max lag mbean for both consumer fetcher manager and replica 
fetcher manager; although I think it is more useful for monitoring consumers. 
For replica fetchers we need to closely monitor all replica fetchers anyway. 
i.e., the set of mbeans is static. I can reduce the scope to just consumers if 
others agree.

 Expose max lag mbean for consumers and replica fetchers
 ---

 Key: KAFKA-921
 URL: https://issues.apache.org/jira/browse/KAFKA-921
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8

 Attachments: KAFKA-921-v1.patch


 We have a ton of consumer mbeans with names that are derived from the 
 consumer id, broker being fetched from, fetcher id, etc. This makes it 
 difficult to do basic monitoring of consumer/replica fetcher lag - since the 
 mbean to monitor can change. A more useful metric for monitoring purposes is 
 the maximum lag across all fetchers.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers

2013-05-29 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-921:
-

Attachment: KAFKA-921-v2.patch

Yes - I think that would be better. Moved it to AbstractFetcherManager. So 
depending on whether you are looking at replica fetchers or consumer fetchers, 
the MaxLag mbean will show up in ReplicaFetcherManager or 
ConsumerFetcherManager respectively.

 Expose max lag mbean for consumers and replica fetchers
 ---

 Key: KAFKA-921
 URL: https://issues.apache.org/jira/browse/KAFKA-921
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8

 Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch


 We have a ton of consumer mbeans with names that are derived from the 
 consumer id, broker being fetched from, fetcher id, etc. This makes it 
 difficult to do basic monitoring of consumer/replica fetcher lag - since the 
 mbean to monitor can change. A more useful metric for monitoring purposes is 
 the maximum lag across all fetchers.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers

2013-05-30 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-921:
-

Attachment: KAFKA-921-v3.patch

One caveat in this approach is that if a fetcher is wedged for any reason, then 
the reported lag is inaccurate since it depends on getting the high watermark 
from fetch responses. i.e., to check on the health of a consumer you would need 
to look at both the max lag and min fetch rate across all fetchers.


 Expose max lag mbean for consumers and replica fetchers
 ---

 Key: KAFKA-921
 URL: https://issues.apache.org/jira/browse/KAFKA-921
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8

 Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch, 
 KAFKA-921-v3.patch


 We have a ton of consumer mbeans with names that are derived from the 
 consumer id, broker being fetched from, fetcher id, etc. This makes it 
 difficult to do basic monitoring of consumer/replica fetcher lag - since the 
 mbean to monitor can change. A more useful metric for monitoring purposes is 
 the maximum lag across all fetchers.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-921) Expose max lag mbean for consumers and replica fetchers

2013-05-31 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-921:
-

Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the reviews. Committed with the minor change - i.e., Replica instead 
of Replica-id

 Expose max lag mbean for consumers and replica fetchers
 ---

 Key: KAFKA-921
 URL: https://issues.apache.org/jira/browse/KAFKA-921
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8

 Attachments: KAFKA-921-v1.patch, KAFKA-921-v2.patch, 
 KAFKA-921-v3.patch


 We have a ton of consumer mbeans with names that are derived from the 
 consumer id, broker being fetched from, fetcher id, etc. This makes it 
 difficult to do basic monitoring of consumer/replica fetcher lag - since the 
 mbean to monitor can change. A more useful metric for monitoring purposes is 
 the maximum lag across all fetchers.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-927) Integrate controlled shutdown into kafka shutdown hook

2013-06-04 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13674598#comment-13674598
 ] 

Joel Koshy commented on KAFKA-927:
--

+1 - sorry I got to this late.

Small nit: the scaladoc for shutdown broker needs an edit which we will clean 
up later.
We probably don't need the adminTest's testShutdownBroker given that the 
rolling bounce test exercises the same logic.

Also, I think we can close KAFKA-817 - another approach with similar goals.



 Integrate controlled shutdown into kafka shutdown hook
 --

 Key: KAFKA-927
 URL: https://issues.apache.org/jira/browse/KAFKA-927
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
 Fix For: 0.8

 Attachments: KAFKA-927.patch, KAFKA-927-v2.patch, 
 KAFKA-927-v2-revised.patch, KAFKA-927-v3.patch, 
 KAFKA-927-v3-removeimports.patch, KAFKA-927-v4.patch


 The controlled shutdown mechanism should be integrated into the software for 
 better operational benefits. Also few optimizations can be done to reduce 
 unnecessary rpc and zk calls. This patch has been tested on a prod like 
 environment by doing rolling bounces continuously for a day. The average time 
 of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes 
 without this patch is 340 seconds. With this patch it reduces to 220 seconds. 
 Also it ensures correctness in scenarios where the controller shrinks the isr 
 and the new leader could place the broker to be shutdown back into the isr.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-938) High CPU usage when more or less idle

2013-06-12 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13681479#comment-13681479
 ] 

Joel Koshy commented on KAFKA-938:
--

Excellent catch Sam!

One comment: I think the DelayedItem class was intended to support arbitrary 
(non-millisecond) timunits but that was buggy in two ways:
(i)  The getDelay's 'unit' parameter shadowed the DelayedItem's 'unit' member
(ii) The delayMs val assumes that the delay is always in ms (which prevents 
DelayedItem from supporting arbitrary time units).
Also, I think we must have missed the bit of the DelayQueue documentation that 
says getDelay is called with TimeUnit.NANOSECONDS

I think we can tweak this a bit to make it support arbitrary timeunits - 
otherwise, the unit parameter of DelayedItem is of no use. I can attach a 
patch to make this clearer.

 High CPU usage when more or less idle
 -

 Key: KAFKA-938
 URL: https://issues.apache.org/jira/browse/KAFKA-938
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Sam Meder
Priority: Critical
 Fix For: 0.8

 Attachments: timeunit.patch


 We've noticed Kafka using a lot of CPU in a pretty much idle environment and 
 tracked it down to it's DelayedItem implementation. In particular, the time 
 conversion for how much longer to wait:
   def getDelay(unit: TimeUnit): Long = {
 val elapsedMs = (SystemTime.milliseconds - createdMs)
 unit.convert(max(delayMs - elapsedMs, 0), unit)
   }
 does not actually convert, so Kafka ends up treating a ms value like 
 nanoseconds, e.g. waking up every 100 ns or so. The above code should really 
 be:
   def getDelay(unit: TimeUnit): Long = {
 val elapsedMs = (SystemTime.milliseconds - createdMs)
 unit.convert(max(delayMs - elapsedMs, 0), TimeUnit.MILLISECONDS)
   }
 I'll attach a patch.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-06-12 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13681787#comment-13681787
 ] 

Joel Koshy commented on KAFKA-937:
--

+1 on the patch.

Additionally, can you make this small (unrelated change) -  make the console 
consumer's autoCommitIntervalOpt default to ConsumerConfig.AutoCommitInterval ?

I think it is worth documenting the typical path of getting into the above 
deadlock:
- Assume at least two fetchers F1, F2
- One or more partitions on F1 go into error and leader finder thread L is 
notified
- L unblocks and proceeds to handle partitions without leader. It holds the 
ConsumerFetcherManager's lock at this point.
- All partitions on F2 go into error.
- F2's handlePartitionsWithError removes partitions from its fetcher's 
partitionMap. (At this point, F2 is by definition an idle fetcher thread.)
- L tries to shutdown idle fetcher threads - i.e., tries to shutdown F2.
- However, F2 at this point is trying to addPartitionsWithError which needs to 
acquire the ConsumerFetcherManager's lock (which is currently held by L).

It is relatively rare in the sense that it can happen only if all partitions on 
the fetcher are in error. This could happen for example if all the leaders for 
those partitions move or become unavailable. Another instance where this may be 
seen in practice is mirroring: we ran into it when running the mirror maker 
with a very large number of producers and ran out of file handles. Running out 
of file handles could easily lead to exceptions on most/all fetches and result 
in an error state for all partitions.


 ConsumerFetcherThread can deadlock
 --

 Key: KAFKA-937
 URL: https://issues.apache.org/jira/browse/KAFKA-937
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Jun Rao
 Attachments: kafka-937.patch


 We have the following access pattern that can introduce a deadlock.
 AbstractFetcherThread.processPartitionsWithError() -
 ConsumerFetcherThread.processPartitionsWithError() - 
 ConsumerFetcherManager.addPartitionsWithError() wait for lock -
 LeaderFinderThread holding lock while calling 
 AbstractFetcherManager.shutdownIdleFetcherThreads() -
 AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
 AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-940) Scala match error in javaapi.Implicits

2013-06-12 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-940:


 Summary: Scala match error in javaapi.Implicits
 Key: KAFKA-940
 URL: https://issues.apache.org/jira/browse/KAFKA-940
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


This would affect javaapi users who (correctly) test for null on API calls 
(e.g., if (partitionMetadata.leader == null))

Right now, we actually get a match error:
scala.MatchError: null
at kafka.javaapi.Implicits$.optionToJavaRef(Implicits.scala:38)
at kafka.javaapi.Implicits$.optionToJavaRef(Implicits.scala:40)
at kafka.javaapi.PartitionMetadata.leader(TopicMetadata.scala:51)
truncated


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-947) isr-expiration-thread may block LeaderAndIsr request for a relatively long period

2013-06-19 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13688719#comment-13688719
 ] 

Joel Koshy commented on KAFKA-947:
--

+1 thanks for the patch!

 isr-expiration-thread may block LeaderAndIsr request for a relatively long 
 period 
 --

 Key: KAFKA-947
 URL: https://issues.apache.org/jira/browse/KAFKA-947
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1
Reporter: Jun Rao
Assignee: Jun Rao
 Attachments: kafka-947.patch


 If there are lots of partitions whose isr needs to be shrank, 
 isr-expiration-thread will hold a long lock on leaderPartitionsLock, which 
 will delay LeaderAndIsr requests.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Assigned] (KAFKA-559) Garbage collect old consumer metadata entries

2013-07-02 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy reassigned KAFKA-559:


Assignee: Tejas Patil

Assigning to Tejas, since he has done some work on this recently.



 Garbage collect old consumer metadata entries
 -

 Key: KAFKA-559
 URL: https://issues.apache.org/jira/browse/KAFKA-559
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Tejas Patil
  Labels: project

 Many use cases involve tranient consumers. These consumers create entries 
 under their consumer group in zk and maintain offsets there as well. There is 
 currently no way to delete these entries. It would be good to have a tool 
 that did something like
   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
 --zookeeper [zk_connect]
 This would scan through consumer group entries and delete any that had no 
 offset update since the given date.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-559) Garbage collect old consumer metadata entries

2013-07-02 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13698307#comment-13698307
 ] 

Joel Koshy commented on KAFKA-559:
--

One additional recommendation: support a --dry-run option.



 Garbage collect old consumer metadata entries
 -

 Key: KAFKA-559
 URL: https://issues.apache.org/jira/browse/KAFKA-559
 Project: Kafka
  Issue Type: New Feature
Reporter: Jay Kreps
Assignee: Tejas Patil
  Labels: project

 Many use cases involve tranient consumers. These consumers create entries 
 under their consumer group in zk and maintain offsets there as well. There is 
 currently no way to delete these entries. It would be good to have a tool 
 that did something like
   bin/delete-obsolete-consumer-groups.sh [--topic t1] --since [date] 
 --zookeeper [zk_connect]
 This would scan through consumer group entries and delete any that had no 
 offset update since the given date.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x

2013-07-03 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13699211#comment-13699211
 ] 

Joel Koshy commented on KAFKA-960:
--

Given that there are API changes and mbean name changes between 2.x and 3.x my 
preference would be to defer this to a few months later (after the official 3.x 
release has proven to be stable).

 Upgrade Metrics to 3.x
 --

 Key: KAFKA-960
 URL: https://issues.apache.org/jira/browse/KAFKA-960
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8
Reporter: Cosmin Lehene
 Fix For: 0.8


 Now that metrics 3.0 has been released 
 (http://metrics.codahale.com/about/release-notes/) we can upgrade back

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x

2013-07-03 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13699545#comment-13699545
 ] 

Joel Koshy commented on KAFKA-960:
--

Not really. However, my point is that given that going both directions (upgrade 
and downgrade) are a bit painful due to the API and mbean changes we should let 
3.x prove itself to be stable in other contexts for a period of time before we 
switch to it.


 Upgrade Metrics to 3.x
 --

 Key: KAFKA-960
 URL: https://issues.apache.org/jira/browse/KAFKA-960
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8
Reporter: Cosmin Lehene
 Fix For: 0.8


 Now that metrics 3.0 has been released 
 (http://metrics.codahale.com/about/release-notes/) we can upgrade back

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-961) state.change.logger: Error on broker 1 while processing LeaderAndIsr request correlationId 6 received from controller 1 epoch 1 for partition (page_visits,0)

2013-07-03 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13699567#comment-13699567
 ] 

Joel Koshy commented on KAFKA-961:
--

Passing in null for time would definitely lead to that NPE as you found. I 
think we only needed a time interface to support a mocktime for tests. Also, we 
probably didn't anticipate that KafkaServer's would need to be embedded in Java 
code. If you are okay with your work-around, then great. Another (ugly) way to 
do it would be to pass in a dynamically instantiated SystemTime - so something 
like (Time) Class.forName(SystemTime.class.getName()).newInstance() - not sure 
if it will work though. We can also provide an explicit constructor without the 
time argument and get rid of the scala default arg.


 state.change.logger: Error on broker 1 while processing LeaderAndIsr request 
 correlationId 6 received from controller 1 epoch 1 for partition 
 (page_visits,0)
 -

 Key: KAFKA-961
 URL: https://issues.apache.org/jira/browse/KAFKA-961
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
 Environment: Linux gman-minty 3.8.0-19-generic #29-Ubuntu SMP Wed Apr 
 17 18:16:28 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
Reporter: Garrett Barton

 Been having issues embedding 0.8 servers into some Yarn stuff I'm doing. I 
 just pulled the latest from git, did a ./sbt +package, followed by ./sbt 
 assembly-package-dependency. And pushed 
 core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar into my local mvn repo.  
 Here is sample code ripped out to little classes to show my error:
 Starting up a broker embedded in java, with the following code:
 ...
   Properties props = new Properties();
   // dont set so it binds to all interfaces
   // props.setProperty(hostname, hostName);
   props.setProperty(port, );
   props.setProperty(broker.id, 1);
   props.setProperty(log.dir, /tmp/embeddedkafka/ + 
 randId);
   // TODO: hardcode bad
   props.setProperty(zookeeper.connect, 
 localhost:2181/ + randId);
   KafkaConfig kconf = new KafkaConfig(props);
   
   server = new KafkaServer(kconf, null);
   server.startup();
   LOG.info(Broker online);
 Sample Producer has the following code:
 ...
   Properties props = new Properties();
   props.put(metadata.broker.list, gman-minty:);
   props.put(serializer.class, kafka.serializer.StringEncoder);
   props.put(partitioner.class, 
 com.gman.broker.SimplePartitioner);
   props.put(request.required.acks, 1);
   ProducerConfig config = new ProducerConfig(props);
   
   ProducerString, String producer = new ProducerString, 
 String(config);
   LOG.info(producer created);
   KeyedMessageString, String data = new KeyedMessageString, 
 String(page_visits, key1, value1);
   producer.send(data);
   LOG.info(wrote message:  + data);
 And here is the server log:
 INFO  2013-07-03 13:47:30,538 [Thread-0] kafka.utils.VerifiableProperties: 
 Verifying properties
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property port is overridden to 
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property broker.id is overridden to 1
 INFO  2013-07-03 13:47:30,568 [Thread-0] kafka.utils.VerifiableProperties: 
 Property zookeeper.connect is overridden to localhost:2181/kafkatest
 INFO  2013-07-03 13:47:30,569 [Thread-0] kafka.utils.VerifiableProperties: 
 Property log.dir is overridden to \tmp\embeddedkafka\1372873650268
 INFO  2013-07-03 13:47:30,574 [Thread-0] kafka.server.KafkaServer: [Kafka 
 Server 1], Starting
 INFO  2013-07-03 13:47:30,609 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Log directory 
 '/home/gman/workspace/distributed_parser/\tmp\embeddedkafka\1372873650268' 
 not found, creating it.
 INFO  2013-07-03 13:47:30,619 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Starting log cleaner every 60 ms
 INFO  2013-07-03 13:47:30,630 [Thread-0] kafka.log.LogManager: [Log Manager 
 on Broker 1] Starting log flusher every 3000 ms with the following overrides 
 Map()
 INFO  2013-07-03 13:47:30,687 [Thread-0] kafka.network.Acceptor: Awaiting 
 socket connections on 0.0.0.0:.
 INFO  2013-07-03 13:47:30,688 [Thread-0] kafka.network.SocketServer: [Socket 
 Server on Broker 1], Started
 INFO  2013-07-03 13:47:30,696 [Thread-0] 

[jira] [Commented] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-07-03 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13699641#comment-13699641
 ] 

Joel Koshy commented on KAFKA-915:
--

This failure is due to the fact that the leaderAndIsr request has not yet made 
it to the brokers until after the mirror maker's rebalance completes. This is 
related to the issue reported in KAFKA-956. Previously (before we started 
caching metadata at the brokers) the partition information was retrieved 
directly from zk.

The fix for now would be to use the create topic admin before starting the 
mirror maker (or move the producer performance start up to well before the 
mirror maker startup).


 System Test - Mirror Maker testcase_5001 failed
 ---

 Key: KAFKA-915
 URL: https://issues.apache.org/jira/browse/KAFKA-915
 Project: Kafka
  Issue Type: Bug
Reporter: John Fung
Assignee: Joel Koshy
Priority: Critical
  Labels: kafka-0.8, replication-testing
 Attachments: testcase_5001_debug_logs.tar.gz


 This case passes if brokers are set to partition = 1, replicas = 1
 It fails if brokers are set to partition = 5, replicas = 3 (consistently 
 reproducible)
 This test case is set up as shown below.
 1. Start 2 ZK as a cluster in Source
 2. Start 2 ZK as a cluster in Target
 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
 5. Start 1 MM
 6. Start ProducerPerformance to send some data
 7. After Producer is done, start ConsoleConsumer to consume data
 8. Stop all processes and validate if there is any data loss.
 9. No failure is introduced to any process in this test
 Attached a tar file which contains the logs and system test output for both 
 cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-966) Allow high level consumer to 'nak' a message and force Kafka to close the KafkaStream without losing that message

2013-07-09 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13703755#comment-13703755
 ] 

Joel Koshy commented on KAFKA-966:
--

One way to accomplish this is to turn off autocommit and checkpoint offsets 
only after a message (or batch of messages) have been written to the DB.

One caveat though is that rebalances (e.g., if a new consumer instance shows 
up) will result in offsets being committed so there would be an issue if the DB 
is unavailable and a rebalance occurs simultaneously and there are unprocessed 
messages that have already been pulled out of the iterator.


 Allow high level consumer to 'nak' a message and force Kafka to close the 
 KafkaStream without losing that message
 -

 Key: KAFKA-966
 URL: https://issues.apache.org/jira/browse/KAFKA-966
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Affects Versions: 0.8
Reporter: Chris Curtin
Assignee: Neha Narkhede
Priority: Minor

 Enhancement request.
 The high level consumer is very close to handling a lot of situations a 
 'typical' client would need. Except for when the message received from Kafka 
 is valid, but the business logic that wants to consume it has a problem.
 For example if I want to write the value to a MongoDB or Cassandra database 
 and the database is not available. I won't know until I go to do the write 
 that the database isn't available, but by then it is too late to NOT read the 
 message from Kafka. Thus if I call shutdown() to stop reading, that message 
 is lost since the offset Kafka writes to ZooKeeper is the next offset.
 Ideally I'd like to be able to tell Kafka: close the KafkaStream but set the 
 next offset to read for this partition to this message when I start up again. 
 And if there are any messages in the BlockingQueue for other partitions, find 
 the lowest # and use it for that partitions offset since I haven't consumed 
 them yet.
 Thus I can cleanly shutdown my processing, resolve whatever the issue is and 
 restart the process.
 Another idea might be to allow a 'peek' into the next message and if I 
 succeed in writing to the database call 'next' to remove it from the queue. 
 I understand this won't deal with a 'kill -9' or hard failure of the JVM 
 leading to the latest offsets not being written to ZooKeeper but it addresses 
 a likely common scenario for consumers. Nor will it add true transactional 
 support since the ZK update could fail.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-969) Need to prevent failure of rebalance when there are no brokers available when consumer comes up

2013-07-10 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13705340#comment-13705340
 ] 

Joel Koshy commented on KAFKA-969:
--

This seems reasonable, but I'm not fully convinced about it. E.g., a test
framework should ensure external dependencies are up before attempting to
make service calls to those dependencies. That said, it is perhaps also
reasonable from a consumer's perspective to expect that returned streams be
empty at first, and whenever brokers and topics show up, then events should
just show up.

I'm +1 on this patch except for the if-else formatting issue.  Also, I think
this patch alone would be insufficient to meet the above.  There are two
other issues:

- We should register a watcher under the topics path (currently done only if
  a wildcard is specified)
- KAFKA-956 is also related. I need to give that one some thought.




 Need to prevent failure of rebalance when there are no brokers available when 
 consumer comes up
 ---

 Key: KAFKA-969
 URL: https://issues.apache.org/jira/browse/KAFKA-969
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
 Attachments: emptybrokeronrebalance-1.patch


 There are some rare instances when a consumer would be up before bringing up 
 the Kafka brokers. This would usually happen in a test scenario. In such 
 conditions, during rebalance instead of failing the rebalance we just log the 
 error and subscribe to broker changes. When the broker comes back up, we 
 trigger the rebalance.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-969) Need to prevent failure of rebalance when there are no brokers available when consumer comes up

2013-07-10 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13705490#comment-13705490
 ] 

Joel Koshy commented on KAFKA-969:
--

As I already said, I'm +1 on this patch for what it intends to address. Those 
two issues I mentioned are orthogonal. By above in my comment I was referring 
to the possible expectation from consumers: .. from a consumer's perspective 
to expect that returned streams be empty at first, and whenever brokers and 
topics show up, then events should just show up. - not the failed to 
rebalance issue.


 Need to prevent failure of rebalance when there are no brokers available when 
 consumer comes up
 ---

 Key: KAFKA-969
 URL: https://issues.apache.org/jira/browse/KAFKA-969
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
 Fix For: 0.8

 Attachments: emptybrokeronrebalance-1.patch


 There are some rare instances when a consumer would be up before bringing up 
 the Kafka brokers. This would usually happen in a test scenario. In such 
 conditions, during rebalance instead of failing the rebalance we just log the 
 error and subscribe to broker changes. When the broker comes back up, we 
 trigger the rebalance.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-07-16 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-705.
--

Resolution: Fixed

 Controlled shutdown doesn't seem to work on more than one broker in a cluster
 -

 Key: KAFKA-705
 URL: https://issues.apache.org/jira/browse/KAFKA-705
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Joel Koshy
Priority: Critical
  Labels: bugs
 Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, 
 shutdown_brokers_eat.py, shutdown-command


 I wrote a script (attached here) to basically round robin through the brokers 
 in a cluster doing the following 2 operations on each of them -
 1. Send the controlled shutdown admin command. If it succeeds
 2. Restart the broker
 What I've observed is that only one broker is able to finish the above 
 successfully the first time around. For the rest of the iterations, no broker 
 is able to shutdown using the admin command and every single time it fails 
 with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-07-16 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13709525#comment-13709525
 ] 

Joel Koshy commented on KAFKA-705:
--

Yes we can close this.

 Controlled shutdown doesn't seem to work on more than one broker in a cluster
 -

 Key: KAFKA-705
 URL: https://issues.apache.org/jira/browse/KAFKA-705
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Joel Koshy
Priority: Critical
  Labels: bugs
 Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, 
 shutdown_brokers_eat.py, shutdown-command


 I wrote a script (attached here) to basically round robin through the brokers 
 in a cluster doing the following 2 operations on each of them -
 1. Send the controlled shutdown admin command. If it succeeds
 2. Restart the broker
 What I've observed is that only one broker is able to finish the above 
 successfully the first time around. For the rest of the iterations, no broker 
 is able to shutdown using the admin command and every single time it fails 
 with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-07-16 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-705.



 Controlled shutdown doesn't seem to work on more than one broker in a cluster
 -

 Key: KAFKA-705
 URL: https://issues.apache.org/jira/browse/KAFKA-705
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Joel Koshy
Priority: Critical
  Labels: bugs
 Attachments: kafka-705-incremental-v2.patch, kafka-705-v1.patch, 
 shutdown_brokers_eat.py, shutdown-command


 I wrote a script (attached here) to basically round robin through the brokers 
 in a cluster doing the following 2 operations on each of them -
 1. Send the controlled shutdown admin command. If it succeeds
 2. Restart the broker
 What I've observed is that only one broker is able to finish the above 
 successfully the first time around. For the rest of the iterations, no broker 
 is able to shutdown using the admin command and every single time it fails 
 with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-973) Messages From Producer Not being Partitioned

2013-07-16 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13709531#comment-13709531
 ] 

Joel Koshy commented on KAFKA-973:
--

Can you try sending more messages? The default partitioner is random so all the 
partitions should get messages (as long as you send enough messages - three 
messages ending up on one partition can happen).

 Messages From Producer Not being Partitioned 
 -

 Key: KAFKA-973
 URL: https://issues.apache.org/jira/browse/KAFKA-973
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8
 Environment: Linux
Reporter: Subbu Srinivasan
Assignee: Neha Narkhede
  Labels: newbie

 I created a two node cluster.
 2 zoo keepers
 2 brokers
 1 topic with replication factor (2) and no of partition 2.
 my consumer group has two threads
 1) From my Java client - I send few  messages to the topic. I have set 
 multiple brokers
 kafka2:9092,kafka1:9092.
 Only one thread in my consumer always gets the messages. It looks like 
 producer is not
 partitioning the requests properly.
 2) However if I send some sample using the simple console producer, I see 
 multiple threads getting
 requests and is load balanced.
 What am I doing wrong in my client?
 public class KafkaProducer {
 
 private final Properties props = new Properties();
 private static AtomicLong counter = new AtomicLong(0);
 kafka.javaapi.producer.ProducerInteger, String producer = null;
 
 public KafkaProducer() 
 {
   props.put(serializer.class, kafka.serializer.StringEncoder);
   props.put(metadata.broker.list, 
 ConfigurationUtility.getKafkaHost());
   producer = new kafka.javaapi.producer.ProducerInteger, String(new 
 ProducerConfig(props));
 } 
 
 public void sendMessage(String msg) throws Exception
 {
 producer.send(new KeyedMessageInteger, 
 String(ConfigurationUtility.getTopicName(), msg));
 }   
 
 
 public static void main(String arg[]) throws Exception
 {
 
 ConfigurationUtility.setKafkaHost(kafka2:9092,kafka1:9092);
   ConfigurationUtility.setTopicName(dnslog);
   
 ConfigurationUtility.setZooKeeperHost(kafka1:2181,kafka2:2181);
   ConfigurationUtility.setConsumerGroupId(dnslog);
   
   for(int i = 0 ; i  2 ; ++i)
   {
   (new 
 KafkaProducer()).sendMessage(UUID.randomUUID().toString());
   }
 }
 }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-957) MirrorMaker needs to preserve the key in the source cluster

2013-07-16 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13710571#comment-13710571
 ] 

Joel Koshy commented on KAFKA-957:
--

Thanks for incorporating 5 and 6. Couple additional comments:
- For the two match statements you have it is probably sufficient and
  clearer to just use if (key == null)  and if (props.contains(..))
- I'm not so sure if the trace is required but it could be useful. Would
  prefer the following format: Sending message with key key - no need to
  show the payload. Also, may want to use java.util.Arrays.toString on the
  byte array.
- Per Jay's offline comments, hashCode in general is a bit unsafe to rely.
  For e.g., it could be a non-uniform distribution or the underlying
  function could change. That said, your usage is safe. Still, it should be
  straightforward to do a custom hash function that we can rely on for
  consistency.


 MirrorMaker needs to preserve the key in the source cluster
 ---

 Key: KAFKA-957
 URL: https://issues.apache.org/jira/browse/KAFKA-957
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Guozhang Wang
 Attachments: KAFKA-957.v1.patch, KAFKA-957.v2.patch, 
 KAFKA-957.v2.patch


 Currently, MirrorMaker only propagates the message to the target cluster, but 
 not the associated key.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-967) Use key range in ProducerPerformance

2013-07-16 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13710589#comment-13710589
 ] 

Joel Koshy commented on KAFKA-967:
--

+1 - thanks for the patch.

 Use key range in ProducerPerformance
 

 Key: KAFKA-967
 URL: https://issues.apache.org/jira/browse/KAFKA-967
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Attachments: KAFKA-967.v1.patch, KAFKA-967.v2.patch


 Currently in ProducerPerformance, the key of the message is set to MessageID. 
 It would better to set it to a specific key within a key range (Integer type) 
 so that we can test the semantic partitioning case. This is related to 
 KAFKA-957.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-347) change number of partitions of a topic online

2013-07-18 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13713045#comment-13713045
 ] 

Joel Koshy commented on KAFKA-347:
--

Thank you for the patch. Couple of comments, all very minor:

AddPartitionsCommand:
- IMO it is more intuitive for the option to be: total partitions desired
  as opposed to num partitions to add
- It is a bit odd that we can allow some partitions with a different
  replication factor from what's already there. I don't see any issues with
  it though. I just think it's a bit odd. One potential issue is if
  producers explicitly want to set acks to say 3 when there are some
  partitions with a replication factor of 2 and some with 3 (However,
  producers really should be using -1 in which case it would be fine).
- I think the command can currently allow an unintentional reassignment of
  replicas since the persistent path is always updated. (or no?) I think
  this can be easily checked for and avoided.
- Apart from start partition id I think getManualReplicaAssignment is
  identical to CreateTopicCommand's - maybe that code can be moved into
  AdminUtils?

KafkaController:
- nitpick: ZkUtils.getAllTopics(zkClient).foreach(p =
  partitionStateMachine.registerPartitionChangeListener(p)) (can you change
  p to t :) - p really looks like a partition but it is a topic )

AdminUtils:
- the //for testing only comment is now misplaced.
- This code is pre-existing, but would prefer changing secondReplicaShift to
  nextReplicaShift.

- Any reason why AddPartitionsTest should not be within AdminTest?
- Finally, can you rebase? Sorry for not getting to this patch sooner :(



 change number of partitions of a topic online
 -

 Key: KAFKA-347
 URL: https://issues.apache.org/jira/browse/KAFKA-347
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Sriram Subramanian
  Labels: features
 Fix For: 0.8.1

 Attachments: kafka-347.patch


 We will need an admin tool to change the number of partitions of a topic 
 online.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-347) change number of partitions of a topic online

2013-07-19 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13714259#comment-13714259
 ] 

Joel Koshy commented on KAFKA-347:
--

Thanks for patch v2. I'm +1 on this as is, but if you can address some of these 
minor comments that would be great.

v2.1 - For num partitions to add vs partitions desired - all I meant was 
that most of the time users would think
of desired number of partitions vs how many more to add. E.g., I have eight 
partitions for a topic, I now want
20 instead. It is more convenient to just say I want 20 partitions instead of 
thinking of how many to add. OTOH since
we don't support reducing partitions treating it as a num partitions to add 
is safer. So I don't feel very strongly
about it either way.

v2.2 - Re: unintentional reassignment of partitions. Yes you are right.

v2.3 - Your patch still has ZookeeperConsumerConnector changes in it, so it did 
not apply cleanly.

v2.4 - On checking the replication factor: if we don't allow having a different 
replication factor for the new partitions
we should not even expose it as an option.

v2.5 - AddPartitionsListener: no need to change it now, but just a comment: we 
can directly parse the replica assignment
from the data object (instead of reading from zookeeper again) right?

v2.6 - On moving getManualReplicaAssignment to AdminUtils - I think it would be 
good to do that here, but either way is
fine.


 change number of partitions of a topic online
 -

 Key: KAFKA-347
 URL: https://issues.apache.org/jira/browse/KAFKA-347
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Sriram Subramanian
  Labels: features
 Fix For: 0.8.1

 Attachments: kafka-347.patch, kafka-347-v2.patch


 We will need an admin tool to change the number of partitions of a topic 
 online.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-982) Logo for Kafka

2013-07-23 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13716285#comment-13716285
 ] 

Joel Koshy commented on KAFKA-982:
--

+1 for 298, and I like Jakob's recursive suggestion as well (can you add 
feedback on that on the 99designs contest?).

294 seems interesting/deep (pen, two k's, I kind of see a person with hands 
raised, etc.), but I just prefer 298 wrt overall appearance. 296 is also good, 
but between 296 and 298 I prefer 298.


 Logo for Kafka
 --

 Key: KAFKA-982
 URL: https://issues.apache.org/jira/browse/KAFKA-982
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps
 Attachments: 289.jpeg, 294.jpeg, 296.png, 298.jpeg


 We should have a logo for kafka.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-925) Add optional partition key override in producer

2013-07-23 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13716593#comment-13716593
 ] 

Joel Koshy commented on KAFKA-925:
--

+1 , looks good to me.

DefaultPartitioner: Do we need the type parameter anymore?

Guozhang has a good point about tools such as mirror maker not having access to 
the original partitioning key.
However, I can see that it would be clunky as we would then need a partition 
key serializer as well. Also,
for something like offset-preserving mirrors we would anyway have the source 
cluster's partition available,
so I don't see it as a major issue.

ConsoleProducer: the enqueue timeout change seems reasonable - I'm assuming it 
was done to avoid dropping
messages when piping into ConsoleProducer. Correct?


 Add optional partition key override in producer
 ---

 Key: KAFKA-925
 URL: https://issues.apache.org/jira/browse/KAFKA-925
 Project: Kafka
  Issue Type: New Feature
  Components: producer 
Affects Versions: 0.8.1
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-925-v1.patch, KAFKA-925-v2.patch


 We have a key that is used for partitioning in the producer and stored with 
 the message. Actually these uses, though often the same, could be different. 
 The two meanings are effectively:
 1. Assignment to a partition
 2. Deduplication within a partition
 In cases where we want to allow the client to take advantage of both of these 
 and they aren't the same it would be nice to allow them to be specified 
 separately.
 To implement this I added an optional partition key to KeyedMessage. When 
 specified this key is used for partitioning rather than the message key. This 
 key is of type Any and the parametric typing is removed from the partitioner 
 to allow it to work with either key.
 An alternative would be to allow the partition id to specified in the 
 KeyedMessage. This would be slightly more convenient in the case where there 
 is no partition key but instead you know a priori the partition number--this 
 case must be handled by giving the partition id as the partition key and 
 using an identity partitioner which is slightly more roundabout. However this 
 is inconsistent with the normal partitioning which requires a key in the case 
 where the partition is determined by a key--in that case you would be 
 manually calling your partitioner in user code. It seems best to me to either 
 use a key or always a partition and since we currently take a key I stuck 
 with that.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-957) MirrorMaker needs to preserve the key in the source cluster

2013-07-24 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-957:
-

Resolution: Fixed
Status: Resolved  (was: Patch Available)

Committed to 0.8

 MirrorMaker needs to preserve the key in the source cluster
 ---

 Key: KAFKA-957
 URL: https://issues.apache.org/jira/browse/KAFKA-957
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Guozhang Wang
 Attachments: KAFKA-957.v1.patch, KAFKA-957.v2.patch, 
 KAFKA-957.v2.patch, KAFKA-957.v3.patch, KAFKA-957.v4.patch


 Currently, MirrorMaker only propagates the message to the target cluster, but 
 not the associated key.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-957) MirrorMaker needs to preserve the key in the source cluster

2013-07-24 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-957.



 MirrorMaker needs to preserve the key in the source cluster
 ---

 Key: KAFKA-957
 URL: https://issues.apache.org/jira/browse/KAFKA-957
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Guozhang Wang
 Attachments: KAFKA-957.v1.patch, KAFKA-957.v2.patch, 
 KAFKA-957.v2.patch, KAFKA-957.v3.patch, KAFKA-957.v4.patch


 Currently, MirrorMaker only propagates the message to the target cluster, but 
 not the associated key.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-957) MirrorMaker needs to preserve the key in the source cluster

2013-07-24 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13718690#comment-13718690
 ] 

Joel Koshy commented on KAFKA-957:
--

+1

 MirrorMaker needs to preserve the key in the source cluster
 ---

 Key: KAFKA-957
 URL: https://issues.apache.org/jira/browse/KAFKA-957
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Guozhang Wang
 Attachments: KAFKA-957.v1.patch, KAFKA-957.v2.patch, 
 KAFKA-957.v2.patch, KAFKA-957.v3.patch, KAFKA-957.v4.patch


 Currently, MirrorMaker only propagates the message to the target cluster, but 
 not the associated key.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-991) Reduce the queue size in hadoop producer

2013-08-01 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13726958#comment-13726958
 ] 

Joel Koshy commented on KAFKA-991:
--

+1

Committed to 0.8

Minor comment:
- queue size is unintuitive. sounds like number of messages, but it is bytes
- The totalSize  queueSize check should ideally be done before adding it to 
msgList.

 Reduce the queue size in hadoop producer
 

 Key: KAFKA-991
 URL: https://issues.apache.org/jira/browse/KAFKA-991
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Swapnil Ghike
Assignee: Swapnil Ghike
  Labels: bugs
 Fix For: 0.8

 Attachments: kafka-991-v1.patch


 Currently the queue.size in hadoop producer is 10MB. This means that the 
 KafkaRecordWriter will hit the send button on kafka producer after the size 
 of uncompressed queued messages becomes greater than 10MB. (The other 
 condition on which the messages are sent is if their number exceeds 
 SHORT.MAX_VALUE).
 Considering that the server accepts a (compressed) batch of messages of 
 sizeupto 1 million bytes minus the log overhead, we should probably reduce 
 the queue size in hadoop producer. We should do two things:
 1. change max message size on the broker to 1 million + log overhead, because 
 that will make the client message size easy to remember. Right now the 
 maximum number of bytes that can be accepted from a client in a batch of 
 messages is an awkward 88. (I don't have a stronger reason). We have set 
 fetch size on the consumer to 1MB, this gives us a lot of room even if the 
 log overhead increased with further versions.
 2. Set the default number of bytes on hadoop producer to 1 million bytes. 
 Anyone who wants higher throughput can override this config using 
 kafka.output.queue.size

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-991) Reduce the queue size in hadoop producer

2013-08-01 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13727092#comment-13727092
 ] 

Joel Koshy commented on KAFKA-991:
--

Thanks for the follow-up patch. totalBytes is set to zero in sendMsgList so the 
next batch totalBytes will less (incorrect) by valBytes.

 Reduce the queue size in hadoop producer
 

 Key: KAFKA-991
 URL: https://issues.apache.org/jira/browse/KAFKA-991
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Swapnil Ghike
Assignee: Swapnil Ghike
  Labels: bugs
 Fix For: 0.8

 Attachments: kafka-991-followup.patch, kafka-991-followup-v2.patch, 
 kafka-991-v1.patch


 Currently the queue.size in hadoop producer is 10MB. This means that the 
 KafkaRecordWriter will hit the send button on kafka producer after the size 
 of uncompressed queued messages becomes greater than 10MB. (The other 
 condition on which the messages are sent is if their number exceeds 
 SHORT.MAX_VALUE).
 Considering that the server accepts a (compressed) batch of messages of 
 sizeupto 1 million bytes minus the log overhead, we should probably reduce 
 the queue size in hadoop producer. We should do two things:
 1. change max message size on the broker to 1 million + log overhead, because 
 that will make the client message size easy to remember. Right now the 
 maximum number of bytes that can be accepted from a client in a batch of 
 messages is an awkward 88. (I don't have a stronger reason). We have set 
 fetch size on the consumer to 1MB, this gives us a lot of room even if the 
 log overhead increased with further versions.
 2. Set the default number of bytes on hadoop producer to 1 million bytes. 
 Anyone who wants higher throughput can override this config using 
 kafka.output.queue.size

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-991) Reduce the queue size in hadoop producer

2013-08-01 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-991:
-

Resolution: Fixed
Status: Resolved  (was: Patch Available)

 Reduce the queue size in hadoop producer
 

 Key: KAFKA-991
 URL: https://issues.apache.org/jira/browse/KAFKA-991
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Swapnil Ghike
Assignee: Swapnil Ghike
  Labels: bugs
 Fix For: 0.8

 Attachments: kafka-991-followup-3.patch, kafka-991-followup.patch, 
 kafka-991-followup-v2.patch, kafka-991-v1.patch


 Currently the queue.size in hadoop producer is 10MB. This means that the 
 KafkaRecordWriter will hit the send button on kafka producer after the size 
 of uncompressed queued messages becomes greater than 10MB. (The other 
 condition on which the messages are sent is if their number exceeds 
 SHORT.MAX_VALUE).
 Considering that the server accepts a (compressed) batch of messages of 
 sizeupto 1 million bytes minus the log overhead, we should probably reduce 
 the queue size in hadoop producer. We should do two things:
 1. change max message size on the broker to 1 million + log overhead, because 
 that will make the client message size easy to remember. Right now the 
 maximum number of bytes that can be accepted from a client in a batch of 
 messages is an awkward 88. (I don't have a stronger reason). We have set 
 fetch size on the consumer to 1MB, this gives us a lot of room even if the 
 log overhead increased with further versions.
 2. Set the default number of bytes on hadoop producer to 1 million bytes. 
 Anyone who wants higher throughput can override this config using 
 kafka.output.queue.size

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-991) Reduce the queue size in hadoop producer

2013-08-01 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-991.



+1

Committed to 0.8

 Reduce the queue size in hadoop producer
 

 Key: KAFKA-991
 URL: https://issues.apache.org/jira/browse/KAFKA-991
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Swapnil Ghike
Assignee: Swapnil Ghike
  Labels: bugs
 Fix For: 0.8

 Attachments: kafka-991-followup-3.patch, kafka-991-followup.patch, 
 kafka-991-followup-v2.patch, kafka-991-v1.patch


 Currently the queue.size in hadoop producer is 10MB. This means that the 
 KafkaRecordWriter will hit the send button on kafka producer after the size 
 of uncompressed queued messages becomes greater than 10MB. (The other 
 condition on which the messages are sent is if their number exceeds 
 SHORT.MAX_VALUE).
 Considering that the server accepts a (compressed) batch of messages of 
 sizeupto 1 million bytes minus the log overhead, we should probably reduce 
 the queue size in hadoop producer. We should do two things:
 1. change max message size on the broker to 1 million + log overhead, because 
 that will make the client message size easy to remember. Right now the 
 maximum number of bytes that can be accepted from a client in a batch of 
 messages is an awkward 88. (I don't have a stronger reason). We have set 
 fetch size on the consumer to 1MB, this gives us a lot of room even if the 
 log overhead increased with further versions.
 2. Set the default number of bytes on hadoop producer to 1 million bytes. 
 Anyone who wants higher throughput can override this config using 
 kafka.output.queue.size

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-08-02 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13728247#comment-13728247
 ] 

Joel Koshy commented on KAFKA-915:
--

+1 on the patch. I actually could not reproduce the other failures, so I'll 
check this in.



_test_case_name  :  testcase_5001
_test_class_name  :  MirrorMakerTest
arg : bounce_leader  :  false
arg : bounce_mirror_maker  :  false
arg : message_producing_free_time_sec  :  15
arg : num_iteration  :  1
arg : num_messages_to_produce_per_producer_call  :  50
arg : num_partition  :  1
arg : replica_factor  :  3
arg : sleep_seconds_between_producer_calls  :  1
validation_status  :
 Unique messages from consumer on [test_1]  :  500
 Unique messages from producer on [test_1]  :  500
 Validate for data matched on topic [test_1]  :  PASSED
 Validate for merged log segment checksum in cluster [source]  :  PASSED
 Validate for merged log segment checksum in cluster [target]  :  PASSED



_test_case_name  :  testcase_5002
_test_class_name  :  MirrorMakerTest
validation_status  :



_test_case_name  :  testcase_5003
_test_class_name  :  MirrorMakerTest
arg : bounce_leader  :  false
arg : bounce_mirror_maker  :  true
arg : bounced_entity_downtime_sec  :  30
arg : message_producing_free_time_sec  :  15
arg : num_iteration  :  1
arg : num_messages_to_produce_per_producer_call  :  50
arg : num_partition  :  1
arg : replica_factor  :  3
arg : sleep_seconds_between_producer_calls  :  1
validation_status  :
 Unique messages from consumer on [test_1]  :  2200
 Unique messages from producer on [test_1]  :  2200
 Validate for data matched on topic [test_1]  :  PASSED
 Validate for merged log segment checksum in cluster [source]  :  PASSED
 Validate for merged log segment checksum in cluster [target]  :  PASSED



_test_case_name  :  testcase_5004
_test_class_name  :  MirrorMakerTest
validation_status  :



_test_case_name  :  testcase_5005
_test_class_name  :  MirrorMakerTest
arg : bounce_leader  :  false
arg : bounce_mirror_maker  :  true
arg : bounced_entity_downtime_sec  :  30
arg : message_producing_free_time_sec  :  15
arg : num_iteration  :  1
arg : num_messages_to_produce_per_producer_call  :  50
arg : num_partition  :  2
arg : replica_factor  :  3
arg : sleep_seconds_between_producer_calls  :  1
validation_status  :
 Unique messages from consumer on [test_1]  :  1400
 Unique messages from consumer on [test_2]  :  1400
 Unique messages from producer on [test_1]  :  1400
 Unique messages from producer on [test_2]  :  1400
 Validate for data matched on topic [test_1]  :  PASSED
 Validate for data matched on topic [test_2]  :  PASSED
 Validate for merged log segment checksum in cluster [source]  :  PASSED
 Validate for merged log segment checksum in cluster [target]  :  PASSED




 System Test - Mirror Maker testcase_5001 failed
 ---

 Key: KAFKA-915
 URL: https://issues.apache.org/jira/browse/KAFKA-915
 Project: Kafka
  Issue Type: Bug
Reporter: John Fung
Assignee: Joel Koshy
Priority: Critical
  Labels: kafka-0.8, replication-testing
 Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz


 This case passes if brokers are set to partition = 1, replicas = 1
 It fails if brokers are set to partition = 5, replicas = 3 (consistently 
 reproducible)
 This test case is set up as shown below.
 1. Start 2 ZK as a cluster in Source
 2. Start 2 ZK as a cluster in Target
 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
 5. Start 1 MM
 6. Start ProducerPerformance to send some data
 7. After Producer is done, start ConsoleConsumer to consume data
 8. Stop all processes and validate if there is any data loss.
 9. No failure is introduced to any process in this test
 Attached a tar file which contains the logs and system test output for both 
 cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-08-02 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy closed KAFKA-915.



 System Test - Mirror Maker testcase_5001 failed
 ---

 Key: KAFKA-915
 URL: https://issues.apache.org/jira/browse/KAFKA-915
 Project: Kafka
  Issue Type: Bug
Reporter: John Fung
Assignee: Joel Koshy
Priority: Critical
  Labels: kafka-0.8, replication-testing
 Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz


 This case passes if brokers are set to partition = 1, replicas = 1
 It fails if brokers are set to partition = 5, replicas = 3 (consistently 
 reproducible)
 This test case is set up as shown below.
 1. Start 2 ZK as a cluster in Source
 2. Start 2 ZK as a cluster in Target
 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
 5. Start 1 MM
 6. Start ProducerPerformance to send some data
 7. After Producer is done, start ConsoleConsumer to consume data
 8. Stop all processes and validate if there is any data loss.
 9. No failure is introduced to any process in this test
 Attached a tar file which contains the logs and system test output for both 
 cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-915) System Test - Mirror Maker testcase_5001 failed

2013-08-02 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy resolved KAFKA-915.
--

Resolution: Fixed

 System Test - Mirror Maker testcase_5001 failed
 ---

 Key: KAFKA-915
 URL: https://issues.apache.org/jira/browse/KAFKA-915
 Project: Kafka
  Issue Type: Bug
Reporter: John Fung
Assignee: Joel Koshy
Priority: Critical
  Labels: kafka-0.8, replication-testing
 Attachments: kafka-915-v1.patch, testcase_5001_debug_logs.tar.gz


 This case passes if brokers are set to partition = 1, replicas = 1
 It fails if brokers are set to partition = 5, replicas = 3 (consistently 
 reproducible)
 This test case is set up as shown below.
 1. Start 2 ZK as a cluster in Source
 2. Start 2 ZK as a cluster in Target
 3. Start 3 brokers as a cluster in Source (partition = 1, replicas = 1)
 4. Start 3 brokers as a cluster in Target (partition = 1, replicas = 1)
 5. Start 1 MM
 6. Start ProducerPerformance to send some data
 7. After Producer is done, start ConsoleConsumer to consume data
 8. Stop all processes and validate if there is any data loss.
 9. No failure is introduced to any process in this test
 Attached a tar file which contains the logs and system test output for both 
 cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-06 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13731187#comment-13731187
 ] 

Joel Koshy commented on KAFKA-992:
--

Delayed review - looks good to me, although I still don't see a benefit in  


storing the timestamp. i.e., the approach to retry on nodeexists if the host


and port are the same would remain the same. i.e., it seems more for


informative purposes. Let me know if I'm missing something. 





@Jun, you have a point about the controller. It seems it may not be a   


problem there since controller re-election will happen only after the data  


is actually deleted. For consumers it may not be an issue either given that 


the consumer id string includes a random uuid.  



 Double Check on Broker Registration to Avoid False NodeExist Exception
 --

 Key: KAFKA-992
 URL: https://issues.apache.org/jira/browse/KAFKA-992
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: Guozhang Wang
 Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch, 
 KAFKA-992.v3.patch, KAFKA-992.v4.patch


 The current behavior of zookeeper for ephemeral nodes is that session 
 expiration and ephemeral node deletion is not an atomic operation. 
 The side-effect of the above zookeeper behavior in Kafka, for certain corner 
 cases, is that ephemeral nodes can be lost even if the session is not 
 expired. The sequence of events that can lead to lossy ephemeral nodes is as 
 follows -
 1. The session expires on the client, it assumes the ephemeral nodes are 
 deleted, so it establishes a new session with zookeeper and tries to 
 re-create the ephemeral nodes. 
 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
 back a NodeExists error code. Now this is legitimate during a session 
 disconnect event (since zkclient automatically retries the
 operation and raises a NodeExists error). Also by design, Kafka server 
 doesn't have multiple zookeeper clients create the same ephemeral node, so 
 Kafka server assumes the NodeExists is normal. 
 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
 from the client's perspective, even though the client has a new valid 
 session, its ephemeral node is gone.
 This behavior is triggered due to very long fsync operations on the zookeeper 
 leader. When the leader wakes up from such a long fsync operation, it has 
 several sessions to expire. And the time between the session expiration and 
 the ephemeral node deletion is magnified. Between these 2 operations, a 
 zookeeper client can issue a ephemeral node creation operation, that could've 
 appeared to have succeeded, but the leader later deletes the ephemeral node 
 leading to permanent ephemeral node loss from the client's perspective. 
 Thread from zookeeper mailing list: 
 http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-06 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13731282#comment-13731282
 ] 

Joel Koshy commented on KAFKA-992:
--

ok nm the comment about timestamp. I had forgotten that nodeexists wouldn't be 
thrown if the data is the same.

 Double Check on Broker Registration to Avoid False NodeExist Exception
 --

 Key: KAFKA-992
 URL: https://issues.apache.org/jira/browse/KAFKA-992
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: Guozhang Wang
 Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch, 
 KAFKA-992.v3.patch, KAFKA-992.v4.patch


 The current behavior of zookeeper for ephemeral nodes is that session 
 expiration and ephemeral node deletion is not an atomic operation. 
 The side-effect of the above zookeeper behavior in Kafka, for certain corner 
 cases, is that ephemeral nodes can be lost even if the session is not 
 expired. The sequence of events that can lead to lossy ephemeral nodes is as 
 follows -
 1. The session expires on the client, it assumes the ephemeral nodes are 
 deleted, so it establishes a new session with zookeeper and tries to 
 re-create the ephemeral nodes. 
 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
 back a NodeExists error code. Now this is legitimate during a session 
 disconnect event (since zkclient automatically retries the
 operation and raises a NodeExists error). Also by design, Kafka server 
 doesn't have multiple zookeeper clients create the same ephemeral node, so 
 Kafka server assumes the NodeExists is normal. 
 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
 from the client's perspective, even though the client has a new valid 
 session, its ephemeral node is gone.
 This behavior is triggered due to very long fsync operations on the zookeeper 
 leader. When the leader wakes up from such a long fsync operation, it has 
 several sessions to expire. And the time between the session expiration and 
 the ephemeral node deletion is magnified. Between these 2 operations, a 
 zookeeper client can issue a ephemeral node creation operation, that could've 
 appeared to have succeeded, but the leader later deletes the ephemeral node 
 leading to permanent ephemeral node loss from the client's perspective. 
 Thread from zookeeper mailing list: 
 http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception

2013-08-06 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13731337#comment-13731337
 ] 

Joel Koshy commented on KAFKA-992:
--

and nm for my comments about controller/consumers as well. For consumers, we
don't regenerate the consumer id string.

For controller, what can end up happening is:
- controller session expires and becomes the controller again (with the
  stale ephemeral node)
- another broker (whose session may not have expired) receives a watch when the
  stale ephemeral node is actually deleted
- so we can end up with two controllers in this scenario.



 Double Check on Broker Registration to Avoid False NodeExist Exception
 --

 Key: KAFKA-992
 URL: https://issues.apache.org/jira/browse/KAFKA-992
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: Guozhang Wang
 Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch, 
 KAFKA-992.v3.patch, KAFKA-992.v4.patch


 The current behavior of zookeeper for ephemeral nodes is that session 
 expiration and ephemeral node deletion is not an atomic operation. 
 The side-effect of the above zookeeper behavior in Kafka, for certain corner 
 cases, is that ephemeral nodes can be lost even if the session is not 
 expired. The sequence of events that can lead to lossy ephemeral nodes is as 
 follows -
 1. The session expires on the client, it assumes the ephemeral nodes are 
 deleted, so it establishes a new session with zookeeper and tries to 
 re-create the ephemeral nodes. 
 2. However, when it tries to re-create the ephemeral node,zookeeper throws 
 back a NodeExists error code. Now this is legitimate during a session 
 disconnect event (since zkclient automatically retries the
 operation and raises a NodeExists error). Also by design, Kafka server 
 doesn't have multiple zookeeper clients create the same ephemeral node, so 
 Kafka server assumes the NodeExists is normal. 
 3. However, after a few seconds zookeeper deletes that ephemeral node. So 
 from the client's perspective, even though the client has a new valid 
 session, its ephemeral node is gone.
 This behavior is triggered due to very long fsync operations on the zookeeper 
 leader. When the leader wakes up from such a long fsync operation, it has 
 several sessions to expire. And the time between the session expiration and 
 the ephemeral node deletion is magnified. Between these 2 operations, a 
 zookeeper client can issue a ephemeral node creation operation, that could've 
 appeared to have succeeded, but the leader later deletes the ephemeral node 
 leading to permanent ephemeral node loss from the client's perspective. 
 Thread from zookeeper mailing list: 
 http://zookeeper.markmail.org/search/?q=Zookeeper+3.3.4#query:Zookeeper%203.3.4%20date%3A201307%20+page:1+mid:zma242a2qgp6gxvx+state:results

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability

2013-08-08 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13733837#comment-13733837
 ] 

Joel Koshy commented on KAFKA-990:
--

- Topics to move json file format seems unnecessarily complicated. Why not just 
a JSON array?
- Use CommandLineUtils.checkRequiredArgs
- May be helpful to also print out the existing partition assignment and the 
final assignment.
- dryrun to dry-run which I think is the spelling unix tools like patch 
tend to use.
- line 88: use head instead of assuming 0 exists (start partition id could be 
!= 0)

I did not finish going through all the changes in controller, but thought I 
would put in my comments so far :)


 Fix ReassignPartitionCommand and improve usability
 --

 Key: KAFKA-990
 URL: https://issues.apache.org/jira/browse/KAFKA-990
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
 Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch


 1. The tool does not register for IsrChangeListener on controller failover.
 2. There is a race condition where the previous listener can fire on 
 controller failover and the replicas can be in ISR. Even after re-registering 
 the ISR listener after failover, it will never be triggered.
 3. The input the tool is a static list which is very hard to use. To improve 
 this, as a first step the tool needs to take a list of topics and list of 
 brokers to do the assignment to and then generate the reassignment plan.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability

2013-08-09 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13735120#comment-13735120
 ] 

Joel Koshy commented on KAFKA-990:
--

Can you elaborate on the change to shutdownBroker in KafkaController? I
think we need to include shutting down brokers because the previous shutdown
attempt may have been incomplete due to no other brokers in ISR for some
partition which would have prevented leader movement. Subsequent attempts
would now be rejected.

Good catches on the controller failover. Agree with Neha that #2 is not a
problem for replicas that are in ISR, however, we do need to re-register the
ISR change listener for those replicas that are in ISR.

Finally, we should probably open a separate jira to implement a feature to
cancel an ongoing reassignment given that it is a long-running operation.
The dry-run option reduces the need for this but nevertheless I think it's a
good feature to support in the future.



 Fix ReassignPartitionCommand and improve usability
 --

 Key: KAFKA-990
 URL: https://issues.apache.org/jira/browse/KAFKA-990
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
 Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch


 1. The tool does not register for IsrChangeListener on controller failover.
 2. There is a race condition where the previous listener can fire on 
 controller failover and the replicas can be in ISR. Even after re-registering 
 the ISR listener after failover, it will never be triggered.
 3. The input the tool is a static list which is very hard to use. To improve 
 this, as a first step the tool needs to take a list of topics and list of 
 brokers to do the assignment to and then generate the reassignment plan.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability

2013-08-09 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13735653#comment-13735653
 ] 

Joel Koshy commented on KAFKA-990:
--

Looks like I might have looked at the wrong patch. I'll review this again this 
weekend.

 Fix ReassignPartitionCommand and improve usability
 --

 Key: KAFKA-990
 URL: https://issues.apache.org/jira/browse/KAFKA-990
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
 Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch


 1. The tool does not register for IsrChangeListener on controller failover.
 2. There is a race condition where the previous listener can fire on 
 controller failover and the replicas can be in ISR. Even after re-registering 
 the ISR listener after failover, it will never be triggered.
 3. The input the tool is a static list which is very hard to use. To improve 
 this, as a first step the tool needs to take a list of topics and list of 
 brokers to do the assignment to and then generate the reassignment plan.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability

2013-08-12 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13737010#comment-13737010
 ] 

Joel Koshy commented on KAFKA-990:
--

The rebased patch looks good - the shutdown changes I was referring to were in 
v1.

+1 on the rebased patch - we can fix the minor comments either on check-in or 
in a separate jira.

 Fix ReassignPartitionCommand and improve usability
 --

 Key: KAFKA-990
 URL: https://issues.apache.org/jira/browse/KAFKA-990
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
 Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch


 1. The tool does not register for IsrChangeListener on controller failover.
 2. There is a race condition where the previous listener can fire on 
 controller failover and the replicas can be in ISR. Even after re-registering 
 the ISR listener after failover, it will never be triggered.
 3. The input the tool is a static list which is very hard to use. To improve 
 this, as a first step the tool needs to take a list of topics and list of 
 brokers to do the assignment to and then generate the reassignment plan.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-990) Fix ReassignPartitionCommand and improve usability

2013-08-12 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13737024#comment-13737024
 ] 

Joel Koshy commented on KAFKA-990:
--

It should be liveOrShuttingDownBrokerIds. This is required because a controlled 
shutdown attempt may
fail - if there are no other brokers in ISR for a partition led by the broker 
being shutdown. In this case we
would want to proceed with a retry (if there are retries left).

 Fix ReassignPartitionCommand and improve usability
 --

 Key: KAFKA-990
 URL: https://issues.apache.org/jira/browse/KAFKA-990
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
 Attachments: KAFKA-990-v1.patch, KAFKA-990-v1-rebased.patch


 1. The tool does not register for IsrChangeListener on controller failover.
 2. There is a race condition where the previous listener can fire on 
 controller failover and the replicas can be in ISR. Even after re-registering 
 the ISR listener after failover, it will never be triggered.
 3. The input the tool is a static list which is very hard to use. To improve 
 this, as a first step the tool needs to take a list of topics and list of 
 brokers to do the assignment to and then generate the reassignment plan.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-937) ConsumerFetcherThread can deadlock

2013-09-03 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13756807#comment-13756807
 ] 

Joel Koshy commented on KAFKA-937:
--

The delta patch slipped through the cracks. We hit that issue recently - a 
network glitch led to the leader-finder-thread hitting an exception while 
adding fetchers and the thread quit:

{code}
leader-finder-thread], Error due to 
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect(Native Method)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:507)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at 
kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:129)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
at 
kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
at 
kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:144)
at 
kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
at 
kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:180)
at 
kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:80)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:95)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$7.apply(ConsumerFetcherManager.scala:92)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:92)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
{code}



+1 on kafka-937_delta with one minor comment: change the log to indicate that 
will attempt to look up the leader again and add fetchers - right now it just 
says failed to add.

 ConsumerFetcherThread can deadlock
 --

 Key: KAFKA-937
 URL: https://issues.apache.org/jira/browse/KAFKA-937
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Jun Rao
Assignee: Jun Rao
 Fix For: 0.8

 Attachments: kafka-937_ConsumerOffsetChecker.patch, 
 kafka-937_delta.patch, kafka-937.patch


 We have the following access pattern that can introduce a deadlock.
 AbstractFetcherThread.processPartitionsWithError() -
 ConsumerFetcherThread.processPartitionsWithError() - 
 ConsumerFetcherManager.addPartitionsWithError() wait for lock -
 LeaderFinderThread holding lock while calling 
 AbstractFetcherManager.shutdownIdleFetcherThreads() -
 AbstractFetcherManager calling fetcher.shutdown, which needs to wait until 
 AbstractFetcherThread.processPartitionsWithError() completes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-998) Producer should not retry on non-recoverable error codes

2013-09-04 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13758450#comment-13758450
 ] 

Joel Koshy commented on KAFKA-998:
--

Apologies for the late review. Couple of comments:
* I think this could reset needRetry back to false if subsequent partitions in 
the iteration do need a retry: needRetry = needRetry  
!fatalException(topicPartitionAndError._2). The logic is actually a bit 
confusing. Instead, it might be clearer to just do: 
failedTopicPartitions.exists(some entry for which we need to retry)
* Can you enhance the logging a bit to indicate that there were fatal sends 
that will not be retried? e.g., Dropping messages to topic x due to message 
size limit.. or something like that.
* Can you rebase?


 Producer should not retry on non-recoverable error codes
 

 Key: KAFKA-998
 URL: https://issues.apache.org/jira/browse/KAFKA-998
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8, 0.8.1
Reporter: Joel Koshy
Assignee: Guozhang Wang
 Attachments: KAFKA-998.v1.patch


 Based on a discussion with Guozhang. The producer currently retries on all 
 error codes (including messagesizetoolarge which is pointless to retry on). 
 This can slow down the producer unnecessarily.
 If at all we want to retry on that error code we would need to retry with a 
 smaller batch size, but that's a separate discussion.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-998) Producer should not retry on non-recoverable error codes

2013-09-05 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13759257#comment-13759257
 ] 

Joel Koshy commented on KAFKA-998:
--

Oh I thought this was for 0.8 - it does apply on trunk.  Do people think this 
is small and important enough to apply to 0.8?


Another comment after thinking about 
the patch: in dispatchSerializedData - would it be better to just drop data 
that have hit the message size limit?  That way, there is no need to return the 
needRetry, so the dispatchSerializedData signature remains the same. The 
disadvantage is that we won't propagage a failedtosendmessage exception for 
such messages to the caller - for the producer in async mode that is probably 
fine (since right now the caller can't really do much with that exception) - in 
sync mode the caller could perhaps decide to send fewer messages at once. Even 
in that case we don't really say which topics/messages hit the message size 
limit so I think it is fine in that case as well. Furthermore, this would be 
covered by KAFKA-1026 to a large degree.

 Producer should not retry on non-recoverable error codes
 

 Key: KAFKA-998
 URL: https://issues.apache.org/jira/browse/KAFKA-998
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8, 0.8.1
Reporter: Joel Koshy
Assignee: Guozhang Wang
 Attachments: KAFKA-998.v1.patch


 Based on a discussion with Guozhang. The producer currently retries on all 
 error codes (including messagesizetoolarge which is pointless to retry on). 
 This can slow down the producer unnecessarily.
 If at all we want to retry on that error code we would need to retry with a 
 smaller batch size, but that's a separate discussion.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-1053) Kafka patch review tool

2013-09-13 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13765763#comment-13765763
 ] 

Joel Koshy commented on KAFKA-1053:
---

I'll take a look today - would like to try it out as well

 Kafka patch review tool
 ---

 Key: KAFKA-1053
 URL: https://issues.apache.org/jira/browse/KAFKA-1053
 Project: Kafka
  Issue Type: New Feature
  Components: tools
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Attachments: KAFKA-1053-v1.patch, KAFKA-1053-v1.patch, 
 KAFKA-1053-v1.patch


 Created a new patch review tool that will integrate JIRA and reviewboard - 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+patch+review+tool

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-1053) Kafka patch review tool

2013-09-13 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13767216#comment-13767216
 ] 

Joel Koshy commented on KAFKA-1053:
---

Ran into this while following the instructions. May be some python version
conflict but throwing it out there in case someone encountered this and
worked around it:

[1709][jkoshy@jkoshy-ld:~]$ sudo easy_install -U setuptools
Traceback (most recent call last):
  File /usr/bin/easy_install, line 9, in module
load_entry_point('distribute', 'console_scripts', 'easy_install')()
  File 
/usr/lib/python2.6/site-packages/setuptools-1.1.5-py2.6.egg/pkg_resources.py, 
line 357, in load_entry_point
return get_distribution(dist).load_entry_point(group, name)
  File 
/usr/lib/python2.6/site-packages/setuptools-1.1.5-py2.6.egg/pkg_resources.py, 
line 2393, in load_entry_point
raise ImportError(Entry point %r not found % ((group,name),))
ImportError: Entry point ('console_scripts', 'easy_install') not found



 Kafka patch review tool
 ---

 Key: KAFKA-1053
 URL: https://issues.apache.org/jira/browse/KAFKA-1053
 Project: Kafka
  Issue Type: New Feature
  Components: tools
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Attachments: KAFKA-1053-followup2.patch, KAFKA-1053-followup.patch, 
 KAFKA-1053-v1.patch, KAFKA-1053-v1.patch, KAFKA-1053-v1.patch, 
 KAFKA-1053-v2.patch, KAFKA-1053-v3.patch


 Created a new patch review tool that will integrate JIRA and reviewboard - 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+patch+review+tool

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-1053) Kafka patch review tool

2013-09-13 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13767225#comment-13767225
 ] 

Joel Koshy commented on KAFKA-1053:
---

Probably because I did both yum install python-setuptools and easy_install -U 
setuptools

 Kafka patch review tool
 ---

 Key: KAFKA-1053
 URL: https://issues.apache.org/jira/browse/KAFKA-1053
 Project: Kafka
  Issue Type: New Feature
  Components: tools
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Attachments: KAFKA-1053-followup2.patch, KAFKA-1053-followup.patch, 
 KAFKA-1053-v1.patch, KAFKA-1053-v1.patch, KAFKA-1053-v1.patch, 
 KAFKA-1053-v2.patch, KAFKA-1053-v3.patch


 Created a new patch review tool that will integrate JIRA and reviewboard - 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+patch+review+tool

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-1049) Encoder implementations are required to provide an undocumented constructor.

2013-09-17 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1049:
--

Attachment: KAFKA-1049.patch

 Encoder implementations are required to provide an undocumented constructor.
 

 Key: KAFKA-1049
 URL: https://issues.apache.org/jira/browse/KAFKA-1049
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Rosenberg
Priority: Minor
 Attachments: KAFKA-1049.patch


 So, it seems that if I want to set a custom serializer class on the producer 
 (in 0.8), I have to use a class that includes a special constructor like:
 public class MyKafkaEncoderMyType implements EncoderMyType {
   // This constructor is expected by the kafka producer, used by reflection
   public MyKafkaEncoder(VerifiableProperties props) {
 // what can I do with this?
   }
  @Override
   public byte[] toBytes(MyType message) {
 return message.toByteArray();
   }
 }
 It seems odd that this would be a requirement when implementing an interface. 
  This seems not to have been the case in 0.7.
 What could my encoder class do with the VerifiableProperties?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-1049) Encoder implementations are required to provide an undocumented constructor.

2013-09-17 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13770182#comment-13770182
 ] 

Joel Koshy commented on KAFKA-1049:
---

Created reviewboard https://reviews.apache.org/r/14188/


 Encoder implementations are required to provide an undocumented constructor.
 

 Key: KAFKA-1049
 URL: https://issues.apache.org/jira/browse/KAFKA-1049
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Rosenberg
Priority: Minor
 Attachments: KAFKA-1049.patch


 So, it seems that if I want to set a custom serializer class on the producer 
 (in 0.8), I have to use a class that includes a special constructor like:
 public class MyKafkaEncoderMyType implements EncoderMyType {
   // This constructor is expected by the kafka producer, used by reflection
   public MyKafkaEncoder(VerifiableProperties props) {
 // what can I do with this?
   }
  @Override
   public byte[] toBytes(MyType message) {
 return message.toByteArray();
   }
 }
 It seems odd that this would be a requirement when implementing an interface. 
  This seems not to have been the case in 0.7.
 What could my encoder class do with the VerifiableProperties?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-1053) Kafka patch review tool

2013-09-17 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13770270#comment-13770270
 ] 

Joel Koshy commented on KAFKA-1053:
---

Nice - I tried this on KAFKA-1049 (as a test - that patch does not work) and it 
worked great!

+1

I did not get time to dig into the issue I ran into on Linux but the steps 
worked on my laptop. I can look into that and update the wiki with a 
work-around if I find one.

Minor comment: the direct Python API is interesting 
http://www.reviewboard.org/docs/rbtools/dev/api/overview (I'm in general wary 
of popen/subprocess); but it is probably more work than its worth to interface 
with that and post-review likely wraps that anyway and is a well-maintained 
tool. Also, would prefer to have the tool create a os.tmpfile as opposed to 
leaving around a patch file but not a big deal.


 Kafka patch review tool
 ---

 Key: KAFKA-1053
 URL: https://issues.apache.org/jira/browse/KAFKA-1053
 Project: Kafka
  Issue Type: New Feature
  Components: tools
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Attachments: KAFKA-1053-2013-09-15_09:40:04.patch, 
 KAFKA-1053_2013-09-15_20:28:01.patch, KAFKA-1053_2013-09-16_14:40:15.patch, 
 KAFKA-1053-followup2.patch, KAFKA-1053-followup.patch, KAFKA-1053-v1.patch, 
 KAFKA-1053-v1.patch, KAFKA-1053-v1.patch, KAFKA-1053-v2.patch, 
 KAFKA-1053-v3.patch


 Created a new patch review tool that will integrate JIRA and reviewboard - 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+patch+review+tool

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-1089) Migration tool system tests soft-fail on 0.8 and trunk

2013-10-16 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-1089:
-

 Summary: Migration tool system tests soft-fail on 0.8 and trunk
 Key: KAFKA-1089
 URL: https://issues.apache.org/jira/browse/KAFKA-1089
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Joel Koshy


Due to a logging issue (similar to KAFKA-1076)



--
This message was sent by Atlassian JIRA
(v6.1#6144)


  1   2   3   4   5   6   7   8   >