[jira] [Commented] (KAFKA-345) Add a listener to ZookeeperConsumerConnector to get notified on rebalance events
[ https://issues.apache.org/jira/browse/KAFKA-345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14211996#comment-14211996 ] Jiangjie Qin commented on KAFKA-345: There are some other considerations regarding adding the callback to old consumer as well. First, it's a backward compatible patch, if user does not wire in the callback, there is no impact. So current user will not be affected. Secondly, it is not too complicated to add the callback and it might take some time for the new producer to be ready for production, hence it seems to worth making this available for the transitional period. I think it could also potentially provide some references for how the callback could be used in new producer. Add a listener to ZookeeperConsumerConnector to get notified on rebalance events Key: KAFKA-345 URL: https://issues.apache.org/jira/browse/KAFKA-345 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7, 0.8.0 Reporter: Peter Romianowski Attachments: KAFKA-345.patch, KAFKA-345.patch A sample use-case In our scenario we partition events by userid and then apply these to some kind of state machine, that modifies the actual state of a user. So events trigger state transitions. In order to avoid the need of loading user's state upon each event processed, we cache that. But if a user's partition is moved to another consumer and then back to the previous consumer we have stale caches and hell breaks loose. I guess the same kind of problem occurs in other scenarios like counting numbers by user, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1764) ZookeeperConsumerConnector could put multiple shutdownCommand to the same data chunk queue.
[ https://issues.apache.org/jira/browse/KAFKA-1764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212011#comment-14212011 ] Stevo Slavic commented on KAFKA-1764: - Is this issue duplicate of KAFKA-1716 ? ZookeeperConsumerConnector could put multiple shutdownCommand to the same data chunk queue. --- Key: KAFKA-1764 URL: https://issues.apache.org/jira/browse/KAFKA-1764 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1764.patch, KAFKA-1764_2014-11-12_14:05:35.patch, KAFKA-1764_2014-11-13_23:57:51.patch In ZookeeperConsumerConnector shutdown(), we could potentially put multiple shutdownCommand into the same data chunk queue, provided the topics are sharing the same data chunk queue in topicThreadIdAndQueues. From email thread to document: In ZookeeperConsumerConnector shutdown(), we could potentially put multiple shutdownCommand into the same data chunk queue, provided the topics are sharing the same data chunk queue in topicThreadIdAndQueues. In our case, we only have 1 consumer stream for all the topics, the data chunk queue capacity is set to 1. The execution sequence causing problem is as below: 1. ZookeeperConsumerConnector shutdown() is called, it tries to put shutdownCommand for each queue in topicThreadIdAndQueues. Since we only have 1 queue, multiple shutdownCommand will be put into the queue. 2. In sendShutdownToAllQueues(), between queue.clean() and queue.put(shutdownCommand), consumer iterator receives the shutdownCommand and put it back into the data chunk queue. After that, ZookeeperConsumerConnector tries to put another shutdownCommand into the data chunk queue but will block forever. The thread stack trace is as below: {code} Thread-23 #58 prio=5 os_prio=0 tid=0x7ff440004800 nid=0x40a waiting on condition [0x7ff4f0124000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x000680b96bf0 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:262) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:259) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.consumer.ZookeeperConsumerConnector.sendShutdownToAllQueues(ZookeeperConsumerConnector.scala:259) at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:199) at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:192) - locked 0x000680dd5848 (a java.lang.Object) at kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) at kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:185) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector
[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212082#comment-14212082 ] Stevo Slavic commented on KAFKA-1716: - Is this issue related to KAFKA-1764 ? That one has a patch. hang during shutdown of ZookeeperConsumerConnector -- Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code}-- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code}-- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/utils/Utils$.inLock(Utils.scala:538) at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51) at
[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time
[ https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212171#comment-14212171 ] Jing Dong commented on KAFKA-1194: -- Are there anyway to fix the exception in current 0.8.1.1 without applying the patch? The kafka broker cannot delete the old log files after the configured time -- Key: KAFKA-1194 URL: https://issues.apache.org/jira/browse/KAFKA-1194 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.1 Environment: window Reporter: Tao Qin Assignee: Jay Kreps Labels: features, patch Fix For: 0.9.0 Attachments: KAFKA-1194.patch, kafka-1194-v1.patch Original Estimate: 72h Remaining Estimate: 72h We tested it in windows environment, and set the log.retention.hours to 24 hours. # The minimum age of a log file to be eligible for deletion log.retention.hours=24 After several days, the kafka broker still cannot delete the old log file. And we get the following exceptions: [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 'kafka-log-retention' (kafka.utils.KafkaScheduler) kafka.common.KafkaStorageException: Failed to change the log file suffix from to .deleted for log segment 1516723 at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249) at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638) at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629) at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:76) at kafka.log.Log.deleteOldSegments(Log.scala:418) at kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284) at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316) at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573) at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742) at kafka.log.LogManager.cleanupLogs(LogManager.scala:314) at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143) at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) I think this error happens because kafka tries to rename the log file when it is still opened. So we should close the file first before rename. The index file uses a special data structure, the MappedByteBuffer. Javadoc describes it as: A mapped byte buffer and the file mapping that it represents remain valid until the buffer itself is garbage-collected. Fortunately, I find a forceUnmap function in kafka code, and perhaps it can be used to free the MappedByteBuffer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Tretyakov updated KAFKA-1481: -- Attachment: KAFKA-1481_2014-11-14_16-39-41_doc.patch KAFKA-1481_2014-11-14_16-33-03.patch Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_2014-10-30_21-35-43.patch, KAFKA-1481_2014-10-31_14-35-43.patch, KAFKA-1481_2014-11-03_16-39-41_doc.patch, KAFKA-1481_2014-11-03_17-02-23.patch, KAFKA-1481_2014-11-10_20-39-41_doc.patch, KAFKA-1481_2014-11-10_21-02-23.patch, KAFKA-1481_2014-11-14_16-33-03.patch, KAFKA-1481_2014-11-14_16-39-41_doc.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch, alternateLayout1.png, alternateLayout2.png, diff-for-alternate-layout1.patch, diff-for-alternate-layout2.patch, originalLayout.png MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212264#comment-14212264 ] Vladimir Tretyakov commented on KAFKA-1481: --- Hi, added new patches (code + doc), go by way 3: (3) kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec Also added Kafka version MBean, it exposes only Kafka version now (from gradle.properties file). I didn't find easy way where I can get build hash, so only version for now. I hope it will be my last patches, it is a time consumption to change things many times and test everything each time and prepare patched, so I really hope these patches are good enough and I will not do additional iterations, thx. Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_2014-10-30_21-35-43.patch, KAFKA-1481_2014-10-31_14-35-43.patch, KAFKA-1481_2014-11-03_16-39-41_doc.patch, KAFKA-1481_2014-11-03_17-02-23.patch, KAFKA-1481_2014-11-10_20-39-41_doc.patch, KAFKA-1481_2014-11-10_21-02-23.patch, KAFKA-1481_2014-11-14_16-33-03.patch, KAFKA-1481_2014-11-14_16-39-41_doc.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch, alternateLayout1.png, alternateLayout2.png, diff-for-alternate-layout1.patch, diff-for-alternate-layout2.patch, originalLayout.png MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1764) ZookeeperConsumerConnector could put multiple shutdownCommand to the same data chunk queue.
[ https://issues.apache.org/jira/browse/KAFKA-1764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212267#comment-14212267 ] Chris Cope commented on KAFKA-1764: --- This now builds and all 547 tests pass. Thanks! ZookeeperConsumerConnector could put multiple shutdownCommand to the same data chunk queue. --- Key: KAFKA-1764 URL: https://issues.apache.org/jira/browse/KAFKA-1764 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1764.patch, KAFKA-1764_2014-11-12_14:05:35.patch, KAFKA-1764_2014-11-13_23:57:51.patch In ZookeeperConsumerConnector shutdown(), we could potentially put multiple shutdownCommand into the same data chunk queue, provided the topics are sharing the same data chunk queue in topicThreadIdAndQueues. From email thread to document: In ZookeeperConsumerConnector shutdown(), we could potentially put multiple shutdownCommand into the same data chunk queue, provided the topics are sharing the same data chunk queue in topicThreadIdAndQueues. In our case, we only have 1 consumer stream for all the topics, the data chunk queue capacity is set to 1. The execution sequence causing problem is as below: 1. ZookeeperConsumerConnector shutdown() is called, it tries to put shutdownCommand for each queue in topicThreadIdAndQueues. Since we only have 1 queue, multiple shutdownCommand will be put into the queue. 2. In sendShutdownToAllQueues(), between queue.clean() and queue.put(shutdownCommand), consumer iterator receives the shutdownCommand and put it back into the data chunk queue. After that, ZookeeperConsumerConnector tries to put another shutdownCommand into the data chunk queue but will block forever. The thread stack trace is as below: {code} Thread-23 #58 prio=5 os_prio=0 tid=0x7ff440004800 nid=0x40a waiting on condition [0x7ff4f0124000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x000680b96bf0 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:262) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:259) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.consumer.ZookeeperConsumerConnector.sendShutdownToAllQueues(ZookeeperConsumerConnector.scala:259) at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:199) at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:192) - locked 0x000680dd5848 (a java.lang.Object) at kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) at kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:185) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector
[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212425#comment-14212425 ] Jiangjie Qin commented on KAFKA-1716: - I don't think they are related. KAFKA-1764 only happens after all the fetcher threads have exited. This issue seems to be because fetcher threads are blocking on reading from the socket and never return. hang during shutdown of ZookeeperConsumerConnector -- Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code}-- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code}-- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/utils/Utils$.inLock(Utils.scala:538) at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at
[jira] [Created] (KAFKA-1770) The description of UnknownTopicOrPartitionException in doc is not accurate.
Jiangjie Qin created KAFKA-1770: --- Summary: The description of UnknownTopicOrPartitionException in doc is not accurate. Key: KAFKA-1770 URL: https://issues.apache.org/jira/browse/KAFKA-1770 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin It was Indicates an unknown topic or a partition id not between 0 and numPartitions-1, whereas should be Indicates one of the following situation: 1. Partition id is not between 0 - numPartitions-1 2. Partition id for the topic does not exist on the broker (This could happen when partitions are reassigned). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 28040: Patch for KAFKA-1770
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28040/ --- Review request for kafka. Bugs: KAFKA-1770 https://issues.apache.org/jira/browse/KAFKA-1770 Repository: kafka Description --- Modified doc for UnknownTopicOrPartitionException Diffs - core/src/main/scala/kafka/common/UnknownTopicOrPartitionException.scala 781e551e5b78b5f436431575c2961fe15acd1414 Diff: https://reviews.apache.org/r/28040/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-1770) The description of UnknownTopicOrPartitionException in doc is not accurate.
[ https://issues.apache.org/jira/browse/KAFKA-1770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1770: Assignee: Jiangjie Qin Status: Patch Available (was: Open) The description of UnknownTopicOrPartitionException in doc is not accurate. --- Key: KAFKA-1770 URL: https://issues.apache.org/jira/browse/KAFKA-1770 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1770.patch It was Indicates an unknown topic or a partition id not between 0 and numPartitions-1, whereas should be Indicates one of the following situation: 1. Partition id is not between 0 - numPartitions-1 2. Partition id for the topic does not exist on the broker (This could happen when partitions are reassigned). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1770) The description of UnknownTopicOrPartitionException in doc is not accurate.
[ https://issues.apache.org/jira/browse/KAFKA-1770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212474#comment-14212474 ] Jiangjie Qin commented on KAFKA-1770: - Created reviewboard https://reviews.apache.org/r/28040/diff/ against branch origin/trunk The description of UnknownTopicOrPartitionException in doc is not accurate. --- Key: KAFKA-1770 URL: https://issues.apache.org/jira/browse/KAFKA-1770 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Attachments: KAFKA-1770.patch It was Indicates an unknown topic or a partition id not between 0 and numPartitions-1, whereas should be Indicates one of the following situation: 1. Partition id is not between 0 - numPartitions-1 2. Partition id for the topic does not exist on the broker (This could happen when partitions are reassigned). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1770) The description of UnknownTopicOrPartitionException in doc is not accurate.
[ https://issues.apache.org/jira/browse/KAFKA-1770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1770: Attachment: KAFKA-1770.patch The description of UnknownTopicOrPartitionException in doc is not accurate. --- Key: KAFKA-1770 URL: https://issues.apache.org/jira/browse/KAFKA-1770 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Attachments: KAFKA-1770.patch It was Indicates an unknown topic or a partition id not between 0 and numPartitions-1, whereas should be Indicates one of the following situation: 1. Partition id is not between 0 - numPartitions-1 2. Partition id for the topic does not exist on the broker (This could happen when partitions are reassigned). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1667) topic-level configuration not validated
[ https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212477#comment-14212477 ] Dmytro Kostiuchenko commented on KAFKA-1667: Bump. Anyone willing to review? topic-level configuration not validated Key: KAFKA-1667 URL: https://issues.apache.org/jira/browse/KAFKA-1667 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1667_2014-11-05_19:43:53.patch, KAFKA-1667_2014-11-06_17:10:14.patch, KAFKA-1667_2014-11-07_14:28:14.patch, KAFKA-1667_2014-11-12_12:49:11.patch I was able to set the configuration for a topic to these invalid values: {code} Topic:topic-config-test PartitionCount:1ReplicationFactor:2 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol {code} It seems that the values are saved as long as they are the correct type, but are not validated like the corresponding broker-level properties. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27684: Patch for KAFKA-1743
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27684/ --- (Updated Nov. 14, 2014, 5 p.m.) Review request for kafka. Bugs: KAFKA-1743 https://issues.apache.org/jira/browse/KAFKA-1743 Repository: kafka Description (updated) --- def commitOffsets method added to make ConsumerConnector backward compatible; Adressing Jun's comments Diffs (updated) - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 07677c1c26768ef9c9032626180d0015f12cb0e0 Diff: https://reviews.apache.org/r/27684/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Updated] (KAFKA-1743) ConsumerConnector.commitOffsets in 0.8.2 is not backward compatible
[ https://issues.apache.org/jira/browse/KAFKA-1743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-1743: --- Attachment: KAFKA-1743_2014-11-14_22:29:21.patch ConsumerConnector.commitOffsets in 0.8.2 is not backward compatible --- Key: KAFKA-1743 URL: https://issues.apache.org/jira/browse/KAFKA-1743 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Manikumar Reddy Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1743.patch, KAFKA-1743_2014-11-08_11:49:31.patch, KAFKA-1743_2014-11-14_22:29:21.patch In 0.8.1.x, ConsumerConnector has the following api: def commitOffsets This is changed to the following in 0.8.2 and breaks compatibility def commitOffsets(retryOnFailure: Boolean = true) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1743) ConsumerConnector.commitOffsets in 0.8.2 is not backward compatible
[ https://issues.apache.org/jira/browse/KAFKA-1743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212481#comment-14212481 ] Manikumar Reddy commented on KAFKA-1743: Updated reviewboard https://reviews.apache.org/r/27684/diff/ against branch origin/0.8.2 ConsumerConnector.commitOffsets in 0.8.2 is not backward compatible --- Key: KAFKA-1743 URL: https://issues.apache.org/jira/browse/KAFKA-1743 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Manikumar Reddy Priority: Blocker Fix For: 0.8.2 Attachments: KAFKA-1743.patch, KAFKA-1743_2014-11-08_11:49:31.patch, KAFKA-1743_2014-11-14_22:29:21.patch In 0.8.1.x, ConsumerConnector has the following api: def commitOffsets This is changed to the following in 0.8.2 and breaks compatibility def commitOffsets(retryOnFailure: Boolean = true) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27684: Patch for KAFKA-1743
On Nov. 10, 2014, 7:50 p.m., Jun Rao wrote: core/src/main/scala/kafka/consumer/ConsumerConnector.scala, lines 76-80 https://reviews.apache.org/r/27684/diff/2/?file=755292#file755292line76 We will also need to change the interface in ConsumerConnector from def commitOffsets(retryOnFailure: Boolean = true) back to def commitOffsets In ZookeeperConsumerconnector, we can make the following method private def commitOffsets(retryOnFailure: Boolean = true) Another question, will scala compiler be confused with 2 methods, one w/o parenthsis and one with 1 parameter having a default? Could you try compiling the code on all scala versions? Manikumar Reddy O wrote: Currently below classes uses the new method commitOffsets(true). kafka/javaapi/consumer/ZookeeperConsumerConnector.scala kafka/tools/TestEndToEndLatency.scala If we are changing the interface, then we need to change the above classes also. If we are not fixing this on trunk, then same problem will come in 0.8.3. How to handle this? 2 methods, one w/o parenthsis and one with 1 parameter is getting compiled on all scala versions. Jun Rao wrote: Thanks for the explanation. There is actually a bit of inconsistency introduced in this patch. In kafka.javaapi.consumer.ZookeeperConsumerConnector, commitOffsets() is implemented as the following. def commitOffsets() { underlying.commitOffsets() } This actually calls underlying.commitOffsets(isAutoCommit: Boolean = true) with a default value of true. However, ConsumerConnector.commitOffset is implemented as the following which sets isAutoCommit to false. def commitOffsets { commitOffsets(false) } So, we should use true in the above. Another thing that I was thinking is that it's going to be a bit confusing if we have the following scala apis. def commitOffsets(retryOnFailure: Boolean = true) def commitOffsets So, if you do commitOffset it calls the second one and if you do commitOffset(), you actually call the first one. However, the expectation is probably the same method will be called in both cases. Would it be better if we get rid of the default like the following? Then, it's clear which method will be called. def commitOffsets(retryOnFailure: Boolean) def commitOffsets Manikumar Reddy O wrote: This JIRA is to make ConsumerConnecor compatible with 0.8.1, right? then, we need to remove def commitOffsets(retryOnFailure: Boolean = true) from ConsumerConnecor. Changing the API to def commitOffsets(retryOnFailure: Boolean) will not help us. It still breaks the compatability. Jun Rao wrote: In 0.8.1, ConsumerConnector has def commitOffsets I was thinking of having the following two APIs in ConsumerConnector in 0.8.2. That should be backward compatible with the 0.8.1 api, right? def commitOffsets(retryOnFailure: Boolean) def commitOffsets Ok. I was thinking there may be some custom implementations of ConsumerConnector interface out side the kafka codebase. So changing the interface will break those implementations. I added the following APIs in ConsumerConnector. def commitOffsets(retryOnFailure: Boolean) def commitOffsets - Manikumar Reddy --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27684/#review60652 --- On Nov. 14, 2014, 5 p.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27684/ --- (Updated Nov. 14, 2014, 5 p.m.) Review request for kafka. Bugs: KAFKA-1743 https://issues.apache.org/jira/browse/KAFKA-1743 Repository: kafka Description --- def commitOffsets method added to make ConsumerConnector backward compatible; Adressing Jun's comments Diffs - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 07677c1c26768ef9c9032626180d0015f12cb0e0 Diff: https://reviews.apache.org/r/27684/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Commented] (KAFKA-313) Add JSON output and looping options to ConsumerOffsetChecker
[ https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212511#comment-14212511 ] Ashish Kumar Singh commented on KAFKA-313: -- [~jjkoshy] I am planning to take a stab at this. If it is OK, then kindly assign this JIRA to me. Add JSON output and looping options to ConsumerOffsetChecker Key: KAFKA-313 URL: https://issues.apache.org/jira/browse/KAFKA-313 Project: Kafka Issue Type: Improvement Reporter: Dave DeMaagd Priority: Minor Labels: newbie, patch Fix For: 0.8.3 Attachments: KAFKA-313-2012032200.diff Adds: * '--loop N' - causes the program to loop forever, sleeping for up to N seconds between loops (loop time minus collection time, unless that's less than 0, at which point it will just run again immediately) * '--asjson' - display as a JSON string instead of the more human readable output format. Neither of the above depend on each other (you can loop in the human readable output, or do a single shot execution with JSON output). Existing behavior/output maintained if neither of the above are used. Diff Attached. Impacted files: core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212515#comment-14212515 ] Michael Herstine commented on KAFKA-1684: - Coming in a little late, but to the question of different ports: yes, we had envisioned three separate ports, both for simplicity's sake, as well as security-related reasons: supporting no authentication on the same port as Kerberos and/or SSL opens us up to downgrade attacks. Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Ivan Lyutov Attachments: KAFKA-1684.patch Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-313) Add JSON output and looping options to ConsumerOffsetChecker
[ https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212527#comment-14212527 ] Joel Koshy commented on KAFKA-313: -- I'm not sure if there is a strong need for this. In Linux you can use watch to repeat the command: watch -n 10 ./bin/kafka-consumer-offset-checker.sh --zookeeper zk --topic topic --group groupid 2 /dev/null Having an in-built loop does save the expense of spinning up a whole VM so it does not hurt to have it I guess. Add JSON output and looping options to ConsumerOffsetChecker Key: KAFKA-313 URL: https://issues.apache.org/jira/browse/KAFKA-313 Project: Kafka Issue Type: Improvement Reporter: Dave DeMaagd Priority: Minor Labels: newbie, patch Fix For: 0.8.3 Attachments: KAFKA-313-2012032200.diff Adds: * '--loop N' - causes the program to loop forever, sleeping for up to N seconds between loops (loop time minus collection time, unless that's less than 0, at which point it will just run again immediately) * '--asjson' - display as a JSON string instead of the more human readable output format. Neither of the above depend on each other (you can loop in the human readable output, or do a single shot execution with JSON output). Existing behavior/output maintained if neither of the above are used. Diff Attached. Impacted files: core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-313) Add JSON output and looping options to ConsumerOffsetChecker
[ https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy reassigned KAFKA-313: Assignee: Ashish Kumar Singh Add JSON output and looping options to ConsumerOffsetChecker Key: KAFKA-313 URL: https://issues.apache.org/jira/browse/KAFKA-313 Project: Kafka Issue Type: Improvement Reporter: Dave DeMaagd Assignee: Ashish Kumar Singh Priority: Minor Labels: newbie, patch Fix For: 0.8.3 Attachments: KAFKA-313-2012032200.diff Adds: * '--loop N' - causes the program to loop forever, sleeping for up to N seconds between loops (loop time minus collection time, unless that's less than 0, at which point it will just run again immediately) * '--asjson' - display as a JSON string instead of the more human readable output format. Neither of the above depend on each other (you can loop in the human readable output, or do a single shot execution with JSON output). Existing behavior/output maintained if neither of the above are used. Diff Attached. Impacted files: core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-313) Add JSON output and looping options to ConsumerOffsetChecker
[ https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-313: - Reviewer: Joel Koshy Add JSON output and looping options to ConsumerOffsetChecker Key: KAFKA-313 URL: https://issues.apache.org/jira/browse/KAFKA-313 Project: Kafka Issue Type: Improvement Reporter: Dave DeMaagd Assignee: Ashish Kumar Singh Priority: Minor Labels: newbie, patch Fix For: 0.8.3 Attachments: KAFKA-313-2012032200.diff Adds: * '--loop N' - causes the program to loop forever, sleeping for up to N seconds between loops (loop time minus collection time, unless that's less than 0, at which point it will just run again immediately) * '--asjson' - display as a JSON string instead of the more human readable output format. Neither of the above depend on each other (you can loop in the human readable output, or do a single shot execution with JSON output). Existing behavior/output maintained if neither of the above are used. Diff Attached. Impacted files: core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212536#comment-14212536 ] Jun Rao commented on KAFKA-1481: Thanks for the patch. Appreciate your persistence. A few comments below. 80. AppInfo.registerInfo() 80.1 On the server side, this needs to be called in KafkaServerStartable.startup(). Some users will start up a Kafka broker using KafkaServerStartable in a container and not from the command line. 80.2 On the client side, if there are multiple instances of clients running in the same jvm, registerInfo() will be called multiple times. It would be good if we make sure registerInfo() only register the mbean once no matter how many times it's called. We can maintain an isRegistered flag internally and only register the mbean if the flag is not set. We can also make this a synchronized method. 80.3 There is no need to call registerInfo() in ConsoleConsumer and ProducerPerformance since the mbean will be registered by the consumer/producer client. 80.4 We will need to add the same version mbean in the new java client. We don't need to do that in this jira. Could you file a separate jira to track that? 81. KafkaServer: remove unused import AppInfo 82. TestUtils: Could you fix the indentation in the following? def sendMessagesToPartition(configs: Seq[KafkaConfig], topic: String, partition: Int, numMessages: Int, compression: CompressionCodec = NoCompressionCodec): List[String] = { 83. As I was reviewing KAFKA-1684, I realized that in the future, a broker may have multiple ports: plain text, SSL, SASL, etc. In this patch, the broker-specific mbeans have the tag of brokerHost and brokerPort. This is going to be inconvenient once the broker has more than one port. I was thinking it's simpler if we just add the brokerId tag or both the brokerId and the brokerHost tag. What do you think? Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_2014-10-30_21-35-43.patch, KAFKA-1481_2014-10-31_14-35-43.patch, KAFKA-1481_2014-11-03_16-39-41_doc.patch, KAFKA-1481_2014-11-03_17-02-23.patch, KAFKA-1481_2014-11-10_20-39-41_doc.patch, KAFKA-1481_2014-11-10_21-02-23.patch, KAFKA-1481_2014-11-14_16-33-03.patch, KAFKA-1481_2014-11-14_16-39-41_doc.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch, alternateLayout1.png, alternateLayout2.png, diff-for-alternate-layout1.patch, diff-for-alternate-layout2.patch, originalLayout.png MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.
[ https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212560#comment-14212560 ] Ewen Cheslack-Postava commented on KAFKA-1745: -- [~Vishal M] I'm not sure what to do about it. If my analysis is correct, this is internal to NIO and we don't really have any control over it -- we just allocate the socket and use it normally, albeit from multiple threads. The new producer uses a dedicated thread for IO which explains why it doesn't seem to exhibit the same behavior. The two options I can see are to shift to using the new producer (which I realize isn't an option for your current Kafka version) or to reorganize your code to have a dedicated thread per producer and make your existing send operations just push data to that thread for processing instead. Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared. - Key: KAFKA-1745 URL: https://issues.apache.org/jira/browse/KAFKA-1745 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Environment: Mac OS Mavericks Reporter: Vishal Priority: Critical Hi, I'm using the java client API for Kafka. I wanted to send data to Kafka by using a producer pool as I'm using a sync producer. The thread that sends the data is from the thread pool that grows and shrinks depending on the usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got this info by using lsof). If I keep using the same thread it's fine but when a new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 PIPEs are created. This is okay, but when the thread is cleared from the thread pool and a new thread is created, then new KQUEUEs and PIPEs are created. The problem is that the old ones which were created are not getting destroyed and they are showing up as open files. This is causing a major problem as the number of open file keep increasing and does not decrease. Please suggest any solutions. FYI, the number of TCP connections established from the producer system to the Kafka Broker remain constant throughout. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212563#comment-14212563 ] Jun Rao commented on KAFKA-1481: 83. On another thought, the port tag may be ok since a client is only going to connect to one port any way. Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_2014-10-30_21-35-43.patch, KAFKA-1481_2014-10-31_14-35-43.patch, KAFKA-1481_2014-11-03_16-39-41_doc.patch, KAFKA-1481_2014-11-03_17-02-23.patch, KAFKA-1481_2014-11-10_20-39-41_doc.patch, KAFKA-1481_2014-11-10_21-02-23.patch, KAFKA-1481_2014-11-14_16-33-03.patch, KAFKA-1481_2014-11-14_16-39-41_doc.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch, alternateLayout1.png, alternateLayout2.png, diff-for-alternate-layout1.patch, diff-for-alternate-layout2.patch, originalLayout.png MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1764) ZookeeperConsumerConnector could put multiple shutdownCommand to the same data chunk queue.
[ https://issues.apache.org/jira/browse/KAFKA-1764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-1764: -- Resolution: Fixed Status: Resolved (was: Patch Available) ZookeeperConsumerConnector could put multiple shutdownCommand to the same data chunk queue. --- Key: KAFKA-1764 URL: https://issues.apache.org/jira/browse/KAFKA-1764 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1764.patch, KAFKA-1764_2014-11-12_14:05:35.patch, KAFKA-1764_2014-11-13_23:57:51.patch In ZookeeperConsumerConnector shutdown(), we could potentially put multiple shutdownCommand into the same data chunk queue, provided the topics are sharing the same data chunk queue in topicThreadIdAndQueues. From email thread to document: In ZookeeperConsumerConnector shutdown(), we could potentially put multiple shutdownCommand into the same data chunk queue, provided the topics are sharing the same data chunk queue in topicThreadIdAndQueues. In our case, we only have 1 consumer stream for all the topics, the data chunk queue capacity is set to 1. The execution sequence causing problem is as below: 1. ZookeeperConsumerConnector shutdown() is called, it tries to put shutdownCommand for each queue in topicThreadIdAndQueues. Since we only have 1 queue, multiple shutdownCommand will be put into the queue. 2. In sendShutdownToAllQueues(), between queue.clean() and queue.put(shutdownCommand), consumer iterator receives the shutdownCommand and put it back into the data chunk queue. After that, ZookeeperConsumerConnector tries to put another shutdownCommand into the data chunk queue but will block forever. The thread stack trace is as below: {code} Thread-23 #58 prio=5 os_prio=0 tid=0x7ff440004800 nid=0x40a waiting on condition [0x7ff4f0124000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x000680b96bf0 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:262) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:259) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.consumer.ZookeeperConsumerConnector.sendShutdownToAllQueues(ZookeeperConsumerConnector.scala:259) at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:199) at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:192) - locked 0x000680dd5848 (a java.lang.Object) at kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) at kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:185) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1771) replicate_testsuite data verification broken if num_partitions replica_factor
Ewen Cheslack-Postava created KAFKA-1771: Summary: replicate_testsuite data verification broken if num_partitions replica_factor Key: KAFKA-1771 URL: https://issues.apache.org/jira/browse/KAFKA-1771 Project: Kafka Issue Type: Bug Components: system tests Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava As discussed in KAFKA-1763, testcase_0131, testcase_0132, and testcase_0133 currently fail with an exception: {quote} Traceback (most recent call last): File /mnt/u001/kafka_replication_system_test/system_test/replication_testsuite/ replica_basic_test.py, line 434, in runTest kafka_system_test_utils.validate_simple_consumer_data_matched_across_replic as(self.systemTestEnv, self.testcaseEnv) File /mnt/u001/kafka_replication_system_test/system_test/utils/kafka_system_tes t_utils.py, line 2223, in validate_simple_consumer_data_matched_across_replicas replicaIdxMsgIdList[replicaIdx - 1][topicPartition] = consumerMsgIdList IndexError: list index out of range {quote} The root cause seems to be kafka_system_test_utils.start_simple_consumer. The current logic seems incorrect. It should be generating one consumer per partition per replica so it can verify the data from all sources, but it currently has a loop involving the list of brokers, where that loop variable isn't even used. But probably a bigger issue is that it's generating multiple processes in the background. It records pids to the single well-known entity pid path, which means only the last pid is saved and we could easily leave zombie processes if one of them hangs for some reason. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1763) validate_index_log in system tests runs remotely but uses local paths
[ https://issues.apache.org/jira/browse/KAFKA-1763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212593#comment-14212593 ] Ewen Cheslack-Postava commented on KAFKA-1763: -- [~mgharat] Again, this is really a separate issue that only became apparent because we're actually catching exceptions now. This problem occurs for replication_tests where has num_partitions replication_factor, although I'm not sure why a couple of others (e.g. testcase_10131, which is the new-producer version of the first one you listed) aren't exhibiting the problem. This one looks like it needs a more substantial fix because there are a few different problems with the code that runs the consumers. I've filed KAFKA- 1771. I don't think we should let that block this patch from getting applied since this fixes the vast majority of the broken test cases. Any fix to this newer issue probably requires another full-suite test run since that code is used by most of the replication test suite and it requires significant changes. [~jjkoshy], since you're marked as reviewer, does that make sense? validate_index_log in system tests runs remotely but uses local paths - Key: KAFKA-1763 URL: https://issues.apache.org/jira/browse/KAFKA-1763 Project: Kafka Issue Type: Bug Components: system tests Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.3 Attachments: KAFKA-1763.patch validate_index_log is the only validation step in the system tests that needs to execute a Kafka binary and it's currently doing so remotely, like the rest of the test binaries. However, this is probably incorrect since it looks like logs are synced back to the driver host and in other cases are operated on locally. It looks like validate_index_log mixes up local/remote paths, causing an exception in DumpLogSegments: {quote} 2014-11-10 12:09:57,665 - DEBUG - executing command [ssh vagrant@worker1 -o 'HostName 127.0.0.1' -o 'Port ' -o 'UserKnownHostsFile /dev/null' -o 'StrictHostKeyChecking no' -o 'PasswordAuthentication no' -o 'IdentityFile /Users/ewencp/.vagrant.d/insecure_private_key' -o 'IdentitiesOnly yes' -o 'LogLevel FATAL' '/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --file /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_0008/logs/broker-3/kafka_server_3_logs/test_1-2/1294.index --verify-index-only 21'] (system_test_utils) 2014-11-10 12:09:58,673 - DEBUG - Dumping /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_0008/logs/broker-3/kafka_server_3_logs/test_1-2/1294.index (kafka_system_test_utils) 2014-11-10 12:09:58,673 - DEBUG - Exception in thread main java.io.FileNotFoundException: /Users/ewencp/kafka.git/system_test/replication_testsuite/testcase_0008/logs/broker-3/kafka_server_3_logs/test_1-2/1294.log (No such file or directory) (kafka_system_test_utils) 2014-11-10 12:09:58,673 - DEBUG - at java.io.FileInputStream.open(Native Method) (kafka_system_test_utils) 2014-11-10 12:09:58,673 - DEBUG - at java.io.FileInputStream.init(FileInputStream.java:146) (kafka_system_test_utils) 2014-11-10 12:09:58,673 - DEBUG - at kafka.utils.Utils$.openChannel(Utils.scala:162) (kafka_system_test_utils) 2014-11-10 12:09:58,673 - DEBUG - at kafka.log.FileMessageSet.init(FileMessageSet.scala:74) (kafka_system_test_utils) 2014-11-10 12:09:58,673 - DEBUG - at kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpIndex(DumpLogSegments.scala:108) (kafka_system_test_utils) 2014-11-10 12:09:58,673 - DEBUG - at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:80) (kafka_system_test_utils) 2014-11-10 12:09:58,674 - DEBUG - at kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73) (kafka_system_test_utils) 2014-11-10 12:09:58,674 - DEBUG - at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) (kafka_system_test_utils) 2014-11-10 12:09:58,674 - DEBUG - at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) (kafka_system_test_utils) 2014-11-10 12:09:58,674 - DEBUG - at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73) (kafka_system_test_utils) 2014-11-10 12:09:58,674 - DEBUG - at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala) (kafka_system_test_utils) {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212598#comment-14212598 ] Vladimir Tretyakov commented on KAFKA-1481: --- Thx Jun, will try to fix everything according your last comments. re 83, yeah host:port is unique pair, so it will work even with KAFKA-1684 Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_2014-10-30_21-35-43.patch, KAFKA-1481_2014-10-31_14-35-43.patch, KAFKA-1481_2014-11-03_16-39-41_doc.patch, KAFKA-1481_2014-11-03_17-02-23.patch, KAFKA-1481_2014-11-10_20-39-41_doc.patch, KAFKA-1481_2014-11-10_21-02-23.patch, KAFKA-1481_2014-11-14_16-33-03.patch, KAFKA-1481_2014-11-14_16-39-41_doc.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch, alternateLayout1.png, alternateLayout2.png, diff-for-alternate-layout1.patch, diff-for-alternate-layout2.patch, originalLayout.png MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-948) ISR list in LeaderAndISR path not updated for partitions when Broker (which is not leader) is down
[ https://issues.apache.org/jira/browse/KAFKA-948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212602#comment-14212602 ] Scott Hunt commented on KAFKA-948: -- I think I just ran into this same issue on our cluster yesterday. Kafka version 2.8.0-0.8.0+46. I first noticed there was a real problem when we had a leader that wasn't in the replica list. (Step 5 below.) Here's what (I think) happened: 1. We had one broker in our cluster fail due to assumed hardware issues (id = 5) 2. A couple days into the failure, I lost faith in ever seeing that machine resurrected and used kafka-reassign-topic.sh to remove broker 5 from all the replica sets (replacing them with other nodes) so that we were back to full (3) replication. There were 2 topics with 24 partitions each that were on broker 5 and needed to be moved. One of the topics is *really* low traffic (most partitions get less than 1 message per day). 3. After moving broker 5 out of the replica sets for all partitions, I noticed that broker 5 was still listed in the ISR for some of the partitions in the low-traffic topic. 4. Later that night, our Technical Operations staff miraculously brought broker 5 back online. I assumed everything was fine and went back to sleep. 5. The next day I checked back and, due probably to some network hiccup, a couple of the partitions listed the no-longer-dead broker as their leader, even though it wasn't in the replica list. i.e. it showed something like: topic: xxxpartition: 8leader: 5replicas: 8,4,3isr: 8,5,4,3 6. I was somewhat alarmed. 7. So I shut down broker 5 (just stopping kafka), so that it would pick new leaders for those partitions. 8. I now have 14 partitions that have broker 5 still in isr and not in replicas. ISR list in LeaderAndISR path not updated for partitions when Broker (which is not leader) is down -- Key: KAFKA-948 URL: https://issues.apache.org/jira/browse/KAFKA-948 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.0 Reporter: Dibyendu Bhattacharya Assignee: Neha Narkhede When the broker which is the leader for a partition is down, the ISR list in the LeaderAndISR path is updated. But if the broker , which is not a leader of the partition is down, the ISR list is not getting updated. This is an issues because ISR list contains the stale entry. This issue I found in kafka-0.8.0-beta1-candidate1 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Enforcing Network Bandwidth Quote with New Java Producer
HI Kafka Team, We like to enforce a network bandwidth quota limit per minute on producer side. How can I do this ? I need some way to count compressed bytes on producer ? I know there is callback does not give this ability ? Let me know the best way. Thanks, Bhavesh
[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212611#comment-14212611 ] Ewen Cheslack-Postava commented on KAFKA-1721: -- [~junrao] This is a trivial version update patch. It would be nice for the fix to make it to 0.8.2, but I'm not sure we want to push a dependency version change between beta and final. Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1721.patch, KAFKA-1721_2014-10-28_09:25:50.patch From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-1771) replicate_testsuite data verification broken if num_partitions replica_factor
[ https://issues.apache.org/jira/browse/KAFKA-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava reassigned KAFKA-1771: Assignee: Ewen Cheslack-Postava replicate_testsuite data verification broken if num_partitions replica_factor --- Key: KAFKA-1771 URL: https://issues.apache.org/jira/browse/KAFKA-1771 Project: Kafka Issue Type: Bug Components: system tests Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava As discussed in KAFKA-1763, testcase_0131, testcase_0132, and testcase_0133 currently fail with an exception: {quote} Traceback (most recent call last): File /mnt/u001/kafka_replication_system_test/system_test/replication_testsuite/ replica_basic_test.py, line 434, in runTest kafka_system_test_utils.validate_simple_consumer_data_matched_across_replic as(self.systemTestEnv, self.testcaseEnv) File /mnt/u001/kafka_replication_system_test/system_test/utils/kafka_system_tes t_utils.py, line 2223, in validate_simple_consumer_data_matched_across_replicas replicaIdxMsgIdList[replicaIdx - 1][topicPartition] = consumerMsgIdList IndexError: list index out of range {quote} The root cause seems to be kafka_system_test_utils.start_simple_consumer. The current logic seems incorrect. It should be generating one consumer per partition per replica so it can verify the data from all sources, but it currently has a loop involving the list of brokers, where that loop variable isn't even used. But probably a bigger issue is that it's generating multiple processes in the background. It records pids to the single well-known entity pid path, which means only the last pid is saved and we could easily leave zombie processes if one of them hangs for some reason. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1721: - Fix Version/s: 0.8.2 Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: KAFKA-1721.patch, KAFKA-1721_2014-10-28_09:25:50.patch From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1642: - Reviewer: Jun Rao (was: Jay Kreps) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212635#comment-14212635 ] Ewen Cheslack-Postava commented on KAFKA-1642: -- [~junrao] I think you reviewed most of this already since we discussed it offline, so I reassigned to you. I think this should be in good shape for committing. [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1642: - Fix Version/s: 0.8.2 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1764) ZookeeperConsumerConnector could put multiple shutdownCommand to the same data chunk queue.
[ https://issues.apache.org/jira/browse/KAFKA-1764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1764: - Fix Version/s: 0.8.2 ZookeeperConsumerConnector could put multiple shutdownCommand to the same data chunk queue. --- Key: KAFKA-1764 URL: https://issues.apache.org/jira/browse/KAFKA-1764 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Jiangjie Qin Fix For: 0.8.2 Attachments: KAFKA-1764.patch, KAFKA-1764_2014-11-12_14:05:35.patch, KAFKA-1764_2014-11-13_23:57:51.patch In ZookeeperConsumerConnector shutdown(), we could potentially put multiple shutdownCommand into the same data chunk queue, provided the topics are sharing the same data chunk queue in topicThreadIdAndQueues. From email thread to document: In ZookeeperConsumerConnector shutdown(), we could potentially put multiple shutdownCommand into the same data chunk queue, provided the topics are sharing the same data chunk queue in topicThreadIdAndQueues. In our case, we only have 1 consumer stream for all the topics, the data chunk queue capacity is set to 1. The execution sequence causing problem is as below: 1. ZookeeperConsumerConnector shutdown() is called, it tries to put shutdownCommand for each queue in topicThreadIdAndQueues. Since we only have 1 queue, multiple shutdownCommand will be put into the queue. 2. In sendShutdownToAllQueues(), between queue.clean() and queue.put(shutdownCommand), consumer iterator receives the shutdownCommand and put it back into the data chunk queue. After that, ZookeeperConsumerConnector tries to put another shutdownCommand into the data chunk queue but will block forever. The thread stack trace is as below: {code} Thread-23 #58 prio=5 os_prio=0 tid=0x7ff440004800 nid=0x40a waiting on condition [0x7ff4f0124000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x000680b96bf0 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:262) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:259) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.consumer.ZookeeperConsumerConnector.sendShutdownToAllQueues(ZookeeperConsumerConnector.scala:259) at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:199) at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:192) - locked 0x000680dd5848 (a java.lang.Object) at kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) at kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:185) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1762) Update max-inflight-request doc string
[ https://issues.apache.org/jira/browse/KAFKA-1762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1762: - Fix Version/s: 0.8.3 Update max-inflight-request doc string -- Key: KAFKA-1762 URL: https://issues.apache.org/jira/browse/KAFKA-1762 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8.3 Attachments: KAFKA-1762.patch The new Producer client introduces a config for the max # of inFlight messages. When it is set 1 on MirrorMaker, however, there is a risk for data loss even with KAFKA-1650 because the offsets recorded in the MM's offset map is no longer continuous. Another issue is that when this value is set 1, there is a risk of message re-ordering in the producer Changes: 1. Set max # of inFlight messages = 1 in MM 2. Leave comments explaining what the risks are of changing -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1692) [Java New Producer] IO Thread Name Must include Client ID
[ https://issues.apache.org/jira/browse/KAFKA-1692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1692: - Fix Version/s: 0.8.2 [Java New Producer] IO Thread Name Must include Client ID --- Key: KAFKA-1692 URL: https://issues.apache.org/jira/browse/KAFKA-1692 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Priority: Trivial Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1692.patch Please add client id so people who are looking at Jconsole or Profile tool can see Thread by client id since single JVM can have multiple producer instance. org.apache.kafka.clients.producer.KafkaProducer {code} String ioThreadName = kafka-producer-network-thread; if(clientId != null){ ioThreadName = ioThreadName + | +clientId; } this.ioThread = new KafkaThread(ioThreadName, this.sender, true); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1719) Make mirror maker exit when one consumer/producer thread exits.
[ https://issues.apache.org/jira/browse/KAFKA-1719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1719: - Fix Version/s: 0.8.2 Make mirror maker exit when one consumer/producer thread exits. --- Key: KAFKA-1719 URL: https://issues.apache.org/jira/browse/KAFKA-1719 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Fix For: 0.8.2 Attachments: KAFKA-1719.patch, KAFKA-1719_2014-10-22_15:04:32.patch, KAFKA-1719_2014-10-23_16:20:22.patch, KAFKA-1719_2014-10-24_00:56:06.patch When one of the consumer/producer thread exits, the entire mirror maker will be blocked. In this case, it is better to make it exit. It seems a single ZookeeperConsumerConnector is sufficient for mirror maker, probably we don't need a list for the connectors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1706) Adding a byte bounded blocking queue to util.
[ https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1706: - Fix Version/s: 0.8.2 Adding a byte bounded blocking queue to util. - Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Fix For: 0.8.2 Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, KAFKA-1706_2014-10-15_09:28:01.patch, KAFKA-1706_2014-10-26_23:47:31.patch, KAFKA-1706_2014-10-26_23:50:07.patch, KAFKA-1706_2014-10-27_18:34:37.patch, KAFKA-1706_2014-10-29_10:57:51.patch We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1668) TopicCommand doesn't warn if --topic argument doesn't match any topics
[ https://issues.apache.org/jira/browse/KAFKA-1668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1668: - Fix Version/s: 0.8.3 TopicCommand doesn't warn if --topic argument doesn't match any topics -- Key: KAFKA-1668 URL: https://issues.apache.org/jira/browse/KAFKA-1668 Project: Kafka Issue Type: Bug Components: tools Reporter: Ryan Berdeen Assignee: Manikumar Reddy Priority: Minor Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1668.patch Running {{kafka-topics.sh --alter}} with an invalid {{--topic}} argument produces no output and exits with 0, indicating success. {code} $ bin/kafka-topics.sh --topic does-not-exist --alter --config invalid=xxx --zookeeper zkhost:2181 $ echo $? 0 {code} An invalid topic name or a regular expression that matches 0 topics should at least print a warning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1648) Round robin consumer balance throws an NPE when there are no topics
[ https://issues.apache.org/jira/browse/KAFKA-1648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1648: - Fix Version/s: 0.8.3 Round robin consumer balance throws an NPE when there are no topics --- Key: KAFKA-1648 URL: https://issues.apache.org/jira/browse/KAFKA-1648 Project: Kafka Issue Type: Bug Components: consumer Reporter: Todd Palino Assignee: Mayuresh Gharat Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1648.patch, KAFKA-1648_2014-10-04_17:40:47.patch, KAFKA-1648_2014-10-08_17:29:14.patch, KAFKA-1648_2014-10-08_17:46:45.patch, KAFKA-1648_2014-10-09_11:56:44.patch If you use the roundrobin rebalance method with a wildcard consumer, and there are no topics in the cluster, rebalance throws a NullPointerException in the consumer and fails. It retries the rebalance, but will continue to throw the NPE. 2014/09/23 17:51:16.147 [ZookeeperConsumerConnector] [kafka-audit_lva1-app0007.corp-1411494404908-4e620544], Cleared all relevant queues for this fetcher 2014/09/23 17:51:16.147 [ZookeeperConsumerConnector] [kafka-audit_lva1-app0007.corp-1411494404908-4e620544], Cleared the data chunks in all the consumer message iterators 2014/09/23 17:51:16.148 [ZookeeperConsumerConnector] [kafka-audit_lva1-app0007.corp-1411494404908-4e620544], Committing all offsets after clearing the fetcher queues 2014/09/23 17:51:46.148 [ZookeeperConsumerConnector] [kafka-audit_lva1-app0007.corp-1411494404908-4e620544], begin rebalancing consumer kafka-audit_lva1-app0007.corp-1411494404908-4e620544 try #0 2014/09/23 17:51:46.148 ERROR [OffspringServletRuntime] [main] [kafka-console-audit] [] Boot listener com.linkedin.kafkaconsoleaudit.KafkaConsoleAuditBootListener failed kafka.common.ConsumerRebalanceFailedException: kafka-audit_lva1-app0007.corp-1411494404908-4e620544 can't rebalance after 10 retries at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:630) at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:897) at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.init(ZookeeperConsumerConnector.scala:931) at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:159) at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:101) at com.linkedin.tracker.consumer.TrackingConsumerImpl.initWildcardIterators(TrackingConsumerImpl.java:88) at com.linkedin.tracker.consumer.TrackingConsumerImpl.getWildcardIterators(TrackingConsumerImpl.java:116) at com.linkedin.kafkaconsoleaudit.KafkaConsoleAudit.createAuditThreads(KafkaConsoleAudit.java:59) at com.linkedin.kafkaconsoleaudit.KafkaConsoleAudit.initializeAudit(KafkaConsoleAudit.java:50) at com.linkedin.kafkaconsoleaudit.KafkaConsoleAuditFactory.createInstance(KafkaConsoleAuditFactory.java:125) at com.linkedin.kafkaconsoleaudit.KafkaConsoleAuditFactory.createInstance(KafkaConsoleAuditFactory.java:20) at com.linkedin.util.factory.SimpleSingletonFactory.createInstance(SimpleSingletonFactory.java:20) at com.linkedin.util.factory.SimpleSingletonFactory.createInstance(SimpleSingletonFactory.java:14) at com.linkedin.util.factory.Generator.doGetBean(Generator.java:337) at com.linkedin.util.factory.Generator.getBean(Generator.java:270) at com.linkedin.kafkaconsoleaudit.KafkaConsoleAuditBootListener.onBoot(KafkaConsoleAuditBootListener.java:16) at com.linkedin.offspring.servlet.OffspringServletRuntime.startGenerator(OffspringServletRuntime.java:147) at com.linkedin.offspring.servlet.OffspringServletRuntime.start(OffspringServletRuntime.java:73) at com.linkedin.offspring.servlet.OffspringServletContextListener.contextInitialized(OffspringServletContextListener.java:28) at org.eclipse.jetty.server.handler.ContextHandler.callContextInitialized(ContextHandler.java:771) at org.eclipse.jetty.servlet.ServletContextHandler.callContextInitialized(ServletContextHandler.java:424) at org.eclipse.jetty.server.handler.ContextHandler.startContext(ContextHandler.java:763) at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:249) at org.eclipse.jetty.webapp.WebAppContext.startContext(WebAppContext.java:1250) at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:706) at org.eclipse.jetty.webapp.WebAppContext.doStart(WebAppContext.java:492)
[jira] [Updated] (KAFKA-1641) Log cleaner exits if last cleaned offset is lower than earliest offset
[ https://issues.apache.org/jira/browse/KAFKA-1641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1641: - Fix Version/s: 0.8.3 Log cleaner exits if last cleaned offset is lower than earliest offset -- Key: KAFKA-1641 URL: https://issues.apache.org/jira/browse/KAFKA-1641 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Joel Koshy Assignee: Guozhang Wang Fix For: 0.8.3 Attachments: KAFKA-1641.patch, KAFKA-1641_2014-10-09_13:04:15.patch Encountered this recently: the log cleaner exited a while ago (I think because the topic had compressed messages). That issue was subsequently addressed by having the producer only send uncompressed. However, on a subsequent restart of the broker we see this: In this scenario I think it is reasonable to just emit a warning and have the cleaner round up its first dirty offset to the base offset of the first segment. {code} [kafka-server] [] [kafka-log-cleaner-thread-0], Error due to java.lang.IllegalArgumentException: requirement failed: Last clean offset is 54770438 but segment base offset is 382844024 for log testtopic-0. at scala.Predef$.require(Predef.scala:145) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:491) at kafka.log.Cleaner.clean(LogCleaner.scala:288) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:202) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:187) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1637) SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group
[ https://issues.apache.org/jira/browse/KAFKA-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1637: - Fix Version/s: 0.8.2 SimpleConsumer.fetchOffset returns wrong error code when no offset exists for topic/partition/consumer group Key: KAFKA-1637 URL: https://issues.apache.org/jira/browse/KAFKA-1637 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 0.8.1, 0.8.1.1 Environment: Linux Reporter: Amir Malekpour Assignee: Ewen Cheslack-Postava Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1637.patch, KAFKA-1637_2014-10-15_09:08:12.patch, KAFKA-1637_2014-10-15_14:47:21.patch This concerns Kafka's Offset Fetch API: According to Kafka's current documentation, if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1. (Link below) However, in Kafka 08.1.1 Error code '3' is returned, which effectively makes it impossible for the client to decide if there was an error, or if there is no offset associated with a topic-partition under that consumer group. https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1597) New metrics: ResponseQueueSize and BeingSentResponses
[ https://issues.apache.org/jira/browse/KAFKA-1597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1597: - Fix Version/s: 0.8.2 New metrics: ResponseQueueSize and BeingSentResponses - Key: KAFKA-1597 URL: https://issues.apache.org/jira/browse/KAFKA-1597 Project: Kafka Issue Type: New Feature Components: core Reporter: Alexis Midon Assignee: Alexis Midon Priority: Minor Labels: features Fix For: 0.8.2 Attachments: ResponseQueueSize.patch, ResponsesBeingSent.patch This patch adds two metrics: h3. ResponseQueueSize As of 0.8.1, the sizes of the response queues are [reported as different metrics|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/RequestChannel.scala#L127-L134] - one per processor thread. This is not very ideal for different reasons: * charts have to sum the different metrics * the metrics collection system might not support 'wild card queries' like {{sum:kafka.network.RequestChannel.Processor_*_ResponseQueueSize}} in which case monitoring now depends on the number of configured network threads * monitoring the response by thread is not very valuable. However the global number of responses is useful. * proposal* So this patch exposes the total number of queued responses as a metric {{ResponseQueueSize}} *implementation* In {{RequestChannel}}, create a Gauge that adds up the size of the response queues. h3. BeingSentResponses As of 0.8.1, the processor threads will poll responses from the queues and attach them to the SelectionKey as fast as possible. The consequence of that is that the response queues are not a good indicator of the number of in-flight responses. The {{ServerSocketChannel}} acting as another queue of response to be sent. The current metrics don't reflect the size of this buffer, which is an issue. *proposal* This patch adds a gauge that keeps track of the number of responses being handled by the {{ServerSocketChannel}}. That new metric is named BeingSentResponses (who said naming was hard?) *implementation* To calculate that metric, the patch adds up the number of SelectionKeys interested in writing, across processor threads. Another approach could be to keep all in-flight responses in a data structure (let's say a map) shared by the processor threads. A response will be added to that map when dequeued from the response queue, and removed when the write is complete. The gauge will simply report the size of that map. I decided against that second approach as it is more intrusive and requires some additional bookkeeping to gather information already available through the {{SelectionKey}}'s -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1580) Reject producer requests to internal topics
[ https://issues.apache.org/jira/browse/KAFKA-1580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1580: - Fix Version/s: 0.8.2 Reject producer requests to internal topics --- Key: KAFKA-1580 URL: https://issues.apache.org/jira/browse/KAFKA-1580 Project: Kafka Issue Type: Bug Components: core Reporter: Joel Koshy Assignee: Jonathan Natkins Fix For: 0.8.2 Attachments: KAFKA-1580.patch, KAFKA-1580_2014-08-14_16:50:40.patch, KAFKA-1580_2014-08-14_16:56:50.patch, KAFKA-1580_2014-08-14_18:21:38.patch, KAFKA-1580_2014-08-15_15:05:29.patch Producer requests to internal topics (currently only __consumer_offset) can be disastrous. E.g., if we allow a message to be appended to the offsets topic this could lead to fatal exceptions when loading the offsets topic and when compacting the log. Producer requests to these topics should be rejected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27684: Patch for KAFKA-1743
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27684/#review61486 --- Thanks for the patch. For clarity, in ZookeeperConsumerConnector, instead of having the following, def commitOffsets(isAutoCommit: Boolean = true) could we break it into two separate methods, same as what's defined in ConsumerConnector? - Jun Rao On Nov. 14, 2014, 5 p.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27684/ --- (Updated Nov. 14, 2014, 5 p.m.) Review request for kafka. Bugs: KAFKA-1743 https://issues.apache.org/jira/browse/KAFKA-1743 Repository: kafka Description --- def commitOffsets method added to make ConsumerConnector backward compatible; Adressing Jun's comments Diffs - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 07677c1c26768ef9c9032626180d0015f12cb0e0 Diff: https://reviews.apache.org/r/27684/diff/ Testing --- Thanks, Manikumar Reddy O
Re: Review Request 27818: Patch for KAFKA-328
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27818/#review61488 --- core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala https://reviews.apache.org/r/27818/#comment103149 Let's follow the convention and change this to fail() - Neha Narkhede On Nov. 10, 2014, 6:05 p.m., Balaji Seshadri wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27818/ --- (Updated Nov. 10, 2014, 6:05 p.m.) Review request for kafka. Bugs: KAFKA-328 https://issues.apache.org/jira/browse/KAFKA-328 Repository: kafka Description --- KAFKA-328 Write unit test for kafka server startup and shutdown API - Review Comments Diffs - core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 1bfb501b2f29c50f3fc5f930fdaad02e03b91e4f core/src/test/scala/unit/kafka/server/ServerStartupTest.scala a0ed4855f2550a0eb2e363dd2fccd8377a9ac172 Diff: https://reviews.apache.org/r/27818/diff/ Testing --- Thanks, Balaji Seshadri
[jira] [Commented] (KAFKA-328) Write unit test for kafka server startup and shutdown API
[ https://issues.apache.org/jira/browse/KAFKA-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212697#comment-14212697 ] Neha Narkhede commented on KAFKA-328: - [~balaji.sesha...@dish.com] Reviewed. Left a suggestion on the rb Write unit test for kafka server startup and shutdown API -- Key: KAFKA-328 URL: https://issues.apache.org/jira/browse/KAFKA-328 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: BalajiSeshadri Labels: newbie Attachments: KAFKA-328-FORMATTED.patch, KAFKA-328-REVIEW-COMMENTS.patch, KAFKA-328.patch, KAFKA-328.patch, KAFKA-328_2014-11-10_11:05:58.patch Background discussion in KAFKA-320 People often try to embed KafkaServer in an application that ends up calling startup() and shutdown() repeatedly and sometimes in odd ways. To ensure this works correctly we have to be very careful about cleaning up resources. This is a good practice for making unit tests reliable anyway. A good first step would be to add some unit tests on startup and shutdown to cover various cases: 1. A Kafka server can startup if it is not already starting up, if it is not currently being shutdown, or if it hasn't been already started 2. A Kafka server can shutdown if it is not already shutting down, if it is not currently starting up, or if it hasn't been already shutdown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (KAFKA-1580) Reject producer requests to internal topics
[ https://issues.apache.org/jira/browse/KAFKA-1580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy reopened KAFKA-1580: --- Assignee: Guozhang Wang (was: Jonathan Natkins) Reject producer requests to internal topics --- Key: KAFKA-1580 URL: https://issues.apache.org/jira/browse/KAFKA-1580 Project: Kafka Issue Type: Bug Components: core Reporter: Joel Koshy Assignee: Guozhang Wang Fix For: 0.8.2 Attachments: KAFKA-1580.patch, KAFKA-1580_2014-08-14_16:50:40.patch, KAFKA-1580_2014-08-14_16:56:50.patch, KAFKA-1580_2014-08-14_18:21:38.patch, KAFKA-1580_2014-08-15_15:05:29.patch Producer requests to internal topics (currently only __consumer_offset) can be disastrous. E.g., if we allow a message to be appended to the offsets topic this could lead to fatal exceptions when loading the offsets topic and when compacting the log. Producer requests to these topics should be rejected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1580) Reject producer requests to internal topics
[ https://issues.apache.org/jira/browse/KAFKA-1580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212811#comment-14212811 ] Joel Koshy commented on KAFKA-1580: --- [~guozhang] I believe this change got lost as part of KAFKA-1583 Would you mind taking a look? Also, I recently ran into a situation where I actually _needed_ to produce messages to the consumer offsets topic. I can send a separate email on that. So I would suggest allowing producer requests to internal topics if a special admin client id is specified (similar to how we allow fetches past the highwatermark with a special debugging client id). Reject producer requests to internal topics --- Key: KAFKA-1580 URL: https://issues.apache.org/jira/browse/KAFKA-1580 Project: Kafka Issue Type: Bug Components: core Reporter: Joel Koshy Assignee: Jonathan Natkins Fix For: 0.8.2 Attachments: KAFKA-1580.patch, KAFKA-1580_2014-08-14_16:50:40.patch, KAFKA-1580_2014-08-14_16:56:50.patch, KAFKA-1580_2014-08-14_18:21:38.patch, KAFKA-1580_2014-08-15_15:05:29.patch Producer requests to internal topics (currently only __consumer_offset) can be disastrous. E.g., if we allow a message to be appended to the offsets topic this could lead to fatal exceptions when loading the offsets topic and when compacting the log. Producer requests to these topics should be rejected. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Offset manager movement (due to change in KAFKA-1469)
I just wanted to follow-up on a fall-out caused by the issue mentioned below. After the offset manager moved, consumer offsets started going to a different partition of the offsets topic. However, the previous partition of the offsets topic can still have the old offsets. Those won't necessarily get compacted out (if the dirtiness threshold is not met). Now suppose multiple leader changes occur and both the new (correct) offsets partition and the old offsets partition happen to move to the same broker (in that order). We load the offsets into the offsets cache on a leader change. So if the order of leader changes is new partition followed by old partition, then the old offsets end up overwriting the correct offsets in the cache with old (most likely out of range) offsets. The above issue affected some of our consumers (especially ones that have auto.offset.reset set to smallest). In order to fix the issue completely I ended up writing this tool to purge bad offsets: https://gist.github.com/jjkoshy/a3f64d67fe494da3c3a6 In order to produce the tombstones the broker needs to allow producer requests to the __consumer_offsets topic. i.e., fortunately we had not yet picked up KAFKA-1580 so the above worked for us. Thanks, Joel On Mon, Sep 22, 2014 at 03:36:46PM -0700, Joel Koshy wrote: I just wanted to send this out as an FYI but it does not affect any released versions. This only affects those who release off trunk and use Kafka-based consumer offset management. KAFKA-1469 fixes an issue in our Utils.abs code. Since we use this method in determining the offset manager for a consumer group, the fix can yield a different offset manager if you happen to run off trunk and upgrade across the fix. This won't affect all groups, but those that happen to hash to a value that is affected by the bug fixed in KAFKA-1469. (Sort of related - we may want to consider not using hashcode on the group and switch to a more standard hashing algorithm but I highly doubt that hashcode values on a string will change in the future.) Thanks, -- Joel
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212873#comment-14212873 ] Joel Koshy commented on KAFKA-1481: --- [~junrao] For 80.2, I believe the additional registration will not create any new mbeans. i.e., it should be a no-op. For 83, we could have one set of mbeans per port right? Or do you think that would be too much? Your suggestion is to drop the port and just unify right? That should also be good. Also, [~vladimir.tretyakov] your patch needs a rebase as mentioned earlier. Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_2014-10-30_21-35-43.patch, KAFKA-1481_2014-10-31_14-35-43.patch, KAFKA-1481_2014-11-03_16-39-41_doc.patch, KAFKA-1481_2014-11-03_17-02-23.patch, KAFKA-1481_2014-11-10_20-39-41_doc.patch, KAFKA-1481_2014-11-10_21-02-23.patch, KAFKA-1481_2014-11-14_16-33-03.patch, KAFKA-1481_2014-11-14_16-39-41_doc.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch, alternateLayout1.png, alternateLayout2.png, diff-for-alternate-layout1.patch, diff-for-alternate-layout2.patch, originalLayout.png MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27693: Patch for KAFKA-1476
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27693/#review61515 --- core/src/main/scala/kafka/tools/ConsumerCommand.scala https://reviews.apache.org/r/27693/#comment103194 This patch still doesn't address the issues I pointed out in my previous review. We should expose configs through a generic config option that takes in key value pairs. Let's remove both of these and replace it with --config. core/src/main/scala/kafka/tools/ConsumerCommand.scala https://reviews.apache.org/r/27693/#comment103196 describe-group should work independently of topic. In that case it should describe for all topics that the group is subscribed to. core/src/main/scala/kafka/tools/ConsumerCommand.scala https://reviews.apache.org/r/27693/#comment103199 List-Seq core/src/main/scala/kafka/tools/ConsumerCommand.scala https://reviews.apache.org/r/27693/#comment103198 Let's use CSV instead of spaces. That way the output is scriptable. core/src/main/scala/kafka/tools/ConsumerCommand.scala https://reviews.apache.org/r/27693/#comment103200 remove toList. - Neha Narkhede On Nov. 10, 2014, 7:06 p.m., Balaji Seshadri wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27693/ --- (Updated Nov. 10, 2014, 7:06 p.m.) Review request for kafka. Bugs: KAFKA-1476 https://issues.apache.org/jira/browse/KAFKA-1476 Repository: kafka Description --- KAFKA-1476 Get list of consumer groups - Review Comments Diffs - core/src/main/scala/kafka/tools/ConsumerCommand.scala PRE-CREATION core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd Diff: https://reviews.apache.org/r/27693/diff/ Testing --- Thanks, Balaji Seshadri
[jira] [Commented] (KAFKA-1624) building on JDK 8 fails
[ https://issues.apache.org/jira/browse/KAFKA-1624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212953#comment-14212953 ] Guozhang Wang commented on KAFKA-1624: -- I did some tests locally with various Scala versions. Only the default 2.10.1 seems not compile with Java 8; 2.10.2, 2.10.3 and 2.11 are all compatible with it. Shall we change the default version of Scala to at least 2.10.2? building on JDK 8 fails --- Key: KAFKA-1624 URL: https://issues.apache.org/jira/browse/KAFKA-1624 Project: Kafka Issue Type: Bug Reporter: Joe Stein Labels: newbie Fix For: 0.9.0 {code} Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support was removed in 8.0 error: error while loading CharSequence, class file '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/lang/CharSequence.class)' is broken (class java.lang.RuntimeException/bad constant pool tag 18 at byte 10) error: error while loading Comparator, class file '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/util/Comparator.class)' is broken (class java.lang.RuntimeException/bad constant pool tag 18 at byte 20) error: error while loading AnnotatedElement, class file '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/lang/reflect/AnnotatedElement.class)' is broken (class java.lang.RuntimeException/bad constant pool tag 18 at byte 76) error: error while loading Arrays, class file '/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar(java/util/Arrays.class)' is broken (class java.lang.RuntimeException/bad constant pool tag 18 at byte 765) /tmp/sbt_53783b12/xsbt/ExtractAPI.scala:395: error: java.util.Comparator does not take type parameters private[this] val sortClasses = new Comparator[Symbol] { ^ 5 errors found :core:compileScala FAILED FAILURE: Build failed with an exception. * What went wrong: Execution failed for task ':core:compileScala'. org.gradle.messaging.remote.internal.PlaceholderException (no error message) * Try: Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. BUILD FAILED Total time: 1 mins 48.298 secs {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1642: --- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patch. Committed to trunk. [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14212987#comment-14212987 ] Jun Rao commented on KAFKA-1481: [~jjkoshy], yes, re-registering an existing mbean is a no-op. However, it would probably be good not to depend on this and to avoid the unnecessary checks on resources. For 83, chances are a given application is going to use one type of port. So, we can leave this as it is. The patch is actually intended for 0.8.2 and it applies. Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_2014-10-30_21-35-43.patch, KAFKA-1481_2014-10-31_14-35-43.patch, KAFKA-1481_2014-11-03_16-39-41_doc.patch, KAFKA-1481_2014-11-03_17-02-23.patch, KAFKA-1481_2014-11-10_20-39-41_doc.patch, KAFKA-1481_2014-11-10_21-02-23.patch, KAFKA-1481_2014-11-14_16-33-03.patch, KAFKA-1481_2014-11-14_16-39-41_doc.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch, alternateLayout1.png, alternateLayout2.png, diff-for-alternate-layout1.patch, diff-for-alternate-layout2.patch, originalLayout.png MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 27536: Patch for KAFKA-1748
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27536/ --- (Updated Nov. 14, 2014, 10:54 p.m.) Review request for kafka. Bugs: KAFKA-1748 https://issues.apache.org/jira/browse/KAFKA-1748 Repository: kafka Description (updated) --- KAFKA-1748 Make cluster resource declaration separate from test role specification and support loading in three ways: a zero-config default on localhost, a JSON file, and loading from Vagrant's ssh-config command. KAFKA-1748 Handle extra log information when parsing vagrant ssh-config output for Vagrant-based system tests. KAFKA-1748: Update system test README with instructions for cluster configuration. KAFKA-1748 Remove old host information from cluster_config.json files. Diffs (updated) - system_test/README.txt 0e469e373c9d91e6394a513ec581ef1cc92fa44c system_test/cluster.json PRE-CREATION system_test/cluster_config.json 8ed896b358f98eed49502b5df830847ffbc7029b system_test/mirror_maker_testsuite/cluster_config.json 5b908ff3bae4eb392425476be6ab64e7938bec22 system_test/mirror_maker_testsuite/mirror_maker_test.py 48f9ff6b2810f23ca161a3d35f3cd20b59c98230 system_test/mirror_maker_testsuite/testcase_15003/cluster_config.json f6fe86787f1c398223bb6c05be1e435706bf2a8b system_test/mirror_maker_testsuite/testcase_15004/cluster_config.json f6fe86787f1c398223bb6c05be1e435706bf2a8b system_test/mirror_maker_testsuite/testcase_15005/cluster_config.json 63ba37b70e476f37140b674b262441d5ee6523e8 system_test/mirror_maker_testsuite/testcase_15006/cluster_config.json 63ba37b70e476f37140b674b262441d5ee6523e8 system_test/mirror_maker_testsuite/testcase_5003/cluster_config.json f6fe86787f1c398223bb6c05be1e435706bf2a8b system_test/mirror_maker_testsuite/testcase_5004/cluster_config.json f6fe86787f1c398223bb6c05be1e435706bf2a8b system_test/mirror_maker_testsuite/testcase_5005/cluster_config.json 63ba37b70e476f37140b674b262441d5ee6523e8 system_test/mirror_maker_testsuite/testcase_5006/cluster_config.json 63ba37b70e476f37140b674b262441d5ee6523e8 system_test/offset_management_testsuite/cluster_config.json dcca2007de4bdcd0f2dde58a318624699b0bc8cc system_test/offset_management_testsuite/offset_management_test.py aa389105aa4271b0e1eaea897ef85c60ee4a5fe8 system_test/replication_testsuite/replica_basic_test.py 16a24a407051a09751ba2c00b5a1efcf002b1863 system_test/replication_testsuite/testcase_0021/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_0022/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_0023/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_0121/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_0122/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_0123/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_0124/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_0125/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_0126/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_0127/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_0131/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_0132/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_0133/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_1/cluster_config.json ab9016dd4fc5588fa722d5a07da0580c2656c0c1 system_test/replication_testsuite/testcase_10131/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_10132/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_10133/cluster_config.json cf147eb3f2024483aa4db5fbf451893a76386e15 system_test/replication_testsuite/testcase_4001/cluster_config.json 9e733cfd98fb074002b96b322bd8502a112a09ff system_test/replication_testsuite/testcase_4002/cluster_config.json 9e733cfd98fb074002b96b322bd8502a112a09ff system_test/replication_testsuite/testcase_4003/cluster_config.json 9e733cfd98fb074002b96b322bd8502a112a09ff system_test/replication_testsuite/testcase_4004/cluster_config.json 9e733cfd98fb074002b96b322bd8502a112a09ff system_test/replication_testsuite/testcase_4005/cluster_config.json
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14213013#comment-14213013 ] Jun Rao commented on KAFKA-1642: Since this is relatively critical and the changes are only in the new java producer, double committed to 0.8.2 as well. [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, KAFKA-1642_2014-10-23_16:19:41.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1748) Decouple system test cluster resources definition from service definitions
[ https://issues.apache.org/jira/browse/KAFKA-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14213012#comment-14213012 ] Ewen Cheslack-Postava commented on KAFKA-1748: -- Updated reviewboard https://reviews.apache.org/r/27536/diff/ against branch origin/trunk Decouple system test cluster resources definition from service definitions -- Key: KAFKA-1748 URL: https://issues.apache.org/jira/browse/KAFKA-1748 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1748.patch, KAFKA-1748_2014-11-03_12:04:18.patch, KAFKA-1748_2014-11-14_14:54:17.patch Currently the system tests use JSON files that specify the set of services for each test and where they should run (i.e. hostname). These currently assume that you already have SSH keys setup, use the same username on the host running the tests and the test cluster, don't require any additional ssh/scp/rsync flags, and assume you'll always have a fixed set of compute resources (or that you'll spend a lot of time editing config files). While we don't want a whole cluster resource manager in the system tests, a bit more flexibility would make it easier to, e.g., run tests against a local vagrant cluster or on dynamically allocated EC2 instances. We can separate out the basic resource spec (i.e. json specifying how to access machines) from the service definition (i.e. a broker should run with settings x, y, z). Restricting to a very simple set of mappings (i.e. map services to hosts with round robin, optionally restricting to no reuse of hosts) should keep things simple. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1748) Decouple system test cluster resources definition from service definitions
[ https://issues.apache.org/jira/browse/KAFKA-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1748: - Attachment: KAFKA-1748_2014-11-14_14:54:17.patch Decouple system test cluster resources definition from service definitions -- Key: KAFKA-1748 URL: https://issues.apache.org/jira/browse/KAFKA-1748 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1748.patch, KAFKA-1748_2014-11-03_12:04:18.patch, KAFKA-1748_2014-11-14_14:54:17.patch Currently the system tests use JSON files that specify the set of services for each test and where they should run (i.e. hostname). These currently assume that you already have SSH keys setup, use the same username on the host running the tests and the test cluster, don't require any additional ssh/scp/rsync flags, and assume you'll always have a fixed set of compute resources (or that you'll spend a lot of time editing config files). While we don't want a whole cluster resource manager in the system tests, a bit more flexibility would make it easier to, e.g., run tests against a local vagrant cluster or on dynamically allocated EC2 instances. We can separate out the basic resource spec (i.e. json specifying how to access machines) from the service definition (i.e. a broker should run with settings x, y, z). Restricting to a very simple set of mappings (i.e. map services to hosts with round robin, optionally restricting to no reuse of hosts) should keep things simple. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1748) Decouple system test cluster resources definition from service definitions
[ https://issues.apache.org/jira/browse/KAFKA-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14213021#comment-14213021 ] Ewen Cheslack-Postava commented on KAFKA-1748: -- A couple of improvements to the patch: 1. Minor fix for parsing vagrant ssh-config output to make it skip extraneous log lines. 2. Update the system_test/README with instructions for configuring the tests. 3. Clean out host information from cluster_config.json files. This touches a bunch of files, but the changes are trivial (and help ensure the tests are actually getting hostnames from the right place). There's one other place where localhost appears in config files -- some of them have a zookeeper setting that looks like it's supposed to tell the producer/consumer its in how to connect to ZK. As far as I can tell, these aren't actually used anywhere and removing it from one of the test configs didn't seem to have any effect. However, since I can't even figure out how it was used in the commit that actually introduced the setting, I'm a bit hesitant to remove these as well. Maybe someone from LI can check if they modify these settings in their setup, which would indicate they might actually be doing something? Decouple system test cluster resources definition from service definitions -- Key: KAFKA-1748 URL: https://issues.apache.org/jira/browse/KAFKA-1748 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1748.patch, KAFKA-1748_2014-11-03_12:04:18.patch, KAFKA-1748_2014-11-14_14:54:17.patch Currently the system tests use JSON files that specify the set of services for each test and where they should run (i.e. hostname). These currently assume that you already have SSH keys setup, use the same username on the host running the tests and the test cluster, don't require any additional ssh/scp/rsync flags, and assume you'll always have a fixed set of compute resources (or that you'll spend a lot of time editing config files). While we don't want a whole cluster resource manager in the system tests, a bit more flexibility would make it easier to, e.g., run tests against a local vagrant cluster or on dynamically allocated EC2 instances. We can separate out the basic resource spec (i.e. json specifying how to access machines) from the service definition (i.e. a broker should run with settings x, y, z). Restricting to a very simple set of mappings (i.e. map services to hosts with round robin, optionally restricting to no reuse of hosts) should keep things simple. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-948) ISR list in LeaderAndISR path not updated for partitions when Broker (which is not leader) is down
[ https://issues.apache.org/jira/browse/KAFKA-948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14213052#comment-14213052 ] Jun Rao commented on KAFKA-948: --- Currently, if you want to replace a failed broker, you will need to bring up the broker on another machine with the same broker id. kafka-reassign-topic.sh won't succeed until the old replicas are removed, which won't happen if the broker hosting an old replica is down. ISR list in LeaderAndISR path not updated for partitions when Broker (which is not leader) is down -- Key: KAFKA-948 URL: https://issues.apache.org/jira/browse/KAFKA-948 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.0 Reporter: Dibyendu Bhattacharya Assignee: Neha Narkhede When the broker which is the leader for a partition is down, the ISR list in the LeaderAndISR path is updated. But if the broker , which is not a leader of the partition is down, the ISR list is not getting updated. This is an issues because ISR list contains the stale entry. This issue I found in kafka-0.8.0-beta1-candidate1 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1721: --- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patch. Since this change is trivial, double committed to 0.8.2 and trunk. Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Fix For: 0.8.2 Attachments: KAFKA-1721.patch, KAFKA-1721_2014-10-28_09:25:50.patch From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Enforcing Network Bandwidth Quote with New Java Producer
We have a metric that measures the per-topic bytes send rate (after compression). You can get the values through the producer api. Thanks, Jun On Fri, Nov 14, 2014 at 10:34 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: HI Kafka Team, We like to enforce a network bandwidth quota limit per minute on producer side. How can I do this ? I need some way to count compressed bytes on producer ? I know there is callback does not give this ability ? Let me know the best way. Thanks, Bhavesh
[jira] [Commented] (KAFKA-1767) /admin/reassign_partitions deleted before reassignment completes
[ https://issues.apache.org/jira/browse/KAFKA-1767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14213141#comment-14213141 ] Jun Rao commented on KAFKA-1767: Do you have an easy way to reproduce this on trunk? Thanks, /admin/reassign_partitions deleted before reassignment completes Key: KAFKA-1767 URL: https://issues.apache.org/jira/browse/KAFKA-1767 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Neha Narkhede https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/controller/KafkaController.scala#L477-L517 describes the process of reassigning partitions. Specifically,by the time {{/admin/reassign_partitions}} is updated, the newly assigned replicas (RAR) should be in sync, and the assigned replicas (AR) in ZooKeeper should be updated: {code} 4. Wait until all replicas in RAR are in sync with the leader. ... 10. Update AR in ZK with RAR. 11. Update the /admin/reassign_partitions path in ZK to remove this partition. {code} This worked in 0.8.1, but in 0.8.1.1 we observe {{/admin/reassign_partitions}} being removed before step 4 has completed. For example, if we have AR [1,2] and then put [3,4] in {{/admin/reassign_partitions}}, the cluster will end up with AR [1,2,3,4] and ISR [1,2] when the key is removed. Eventually, the AR will be updated to [3,4]. This means that the {{kafka-reassign-partitions.sh}} tool will accept a new batch of reassignments before the current reassignments have finished, and our own tool that feeds in reassignments in small batches (see KAFKA-1677) can't rely on this key to detect active reassignments. Although we haven't observed this, it seems likely that if a controller resignation happens, the new controller won't know that a reassignment is in progress, and the AR will never be updated to the RAR. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1767) /admin/reassign_partitions deleted before reassignment completes
[ https://issues.apache.org/jira/browse/KAFKA-1767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14213173#comment-14213173 ] Guozhang Wang commented on KAFKA-1767: -- Do you have a controller migration before this happened? If yes then you are very likely hitting KAFKA-1578. /admin/reassign_partitions deleted before reassignment completes Key: KAFKA-1767 URL: https://issues.apache.org/jira/browse/KAFKA-1767 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Neha Narkhede https://github.com/apache/kafka/blob/0.8.1.1/core/src/main/scala/kafka/controller/KafkaController.scala#L477-L517 describes the process of reassigning partitions. Specifically,by the time {{/admin/reassign_partitions}} is updated, the newly assigned replicas (RAR) should be in sync, and the assigned replicas (AR) in ZooKeeper should be updated: {code} 4. Wait until all replicas in RAR are in sync with the leader. ... 10. Update AR in ZK with RAR. 11. Update the /admin/reassign_partitions path in ZK to remove this partition. {code} This worked in 0.8.1, but in 0.8.1.1 we observe {{/admin/reassign_partitions}} being removed before step 4 has completed. For example, if we have AR [1,2] and then put [3,4] in {{/admin/reassign_partitions}}, the cluster will end up with AR [1,2,3,4] and ISR [1,2] when the key is removed. Eventually, the AR will be updated to [3,4]. This means that the {{kafka-reassign-partitions.sh}} tool will accept a new batch of reassignments before the current reassignments have finished, and our own tool that feeds in reassignments in small batches (see KAFKA-1677) can't rely on this key to detect active reassignments. Although we haven't observed this, it seems likely that if a controller resignation happens, the new controller won't know that a reassignment is in progress, and the AR will never be updated to the RAR. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Kafka Simple Consumer API for 0.9
Hi, Is the Simple Consumer API will change in Kafka 0.9 ? I can see a Consumer Re-design approach for Kafka 0.9 , which I believe will not impact any client written using Simple Consumer API . Is that correct ? Regards, Dibyendu
Re: Review Request 27634: Patch for KAFKA-1667
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27634/#review61581 --- Thanks for the patch. Looks good overall. A few comments below. core/src/main/scala/kafka/log/LogConfig.scala https://reviews.apache.org/r/27634/#comment103317 This is actually a hard limit. core/src/main/scala/kafka/log/LogConfig.scala https://reviews.apache.org/r/27634/#comment103321 Could we just put those in Object LogConfig? core/src/main/scala/kafka/log/LogConfig.scala https://reviews.apache.org/r/27634/#comment103318 Two age. core/src/main/scala/kafka/log/LogConfig.scala https://reviews.apache.org/r/27634/#comment103319 This actually can be controlled at the topic level. core/src/main/scala/kafka/log/LogConfig.scala https://reviews.apache.org/r/27634/#comment103320 Leading space after . core/src/main/scala/kafka/log/LogConfig.scala https://reviews.apache.org/r/27634/#comment103316 The importance looks good. core/src/main/scala/kafka/utils/Utils.scala https://reviews.apache.org/r/27634/#comment103323 Could we rename this to sth like overridesAndDefaults()? core/src/test/scala/kafka/log/LogConfigTest.scala https://reviews.apache.org/r/27634/#comment103300 Need to add Apache license header. core/src/test/scala/kafka/log/LogConfigTest.scala https://reviews.apache.org/r/27634/#comment103312 Does return skip the rest of the config names? - Jun Rao On Nov. 12, 2014, 11:49 a.m., Dmytro Kostiuchenko wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27634/ --- (Updated Nov. 12, 2014, 11:49 a.m.) Review request for kafka. Bugs: KAFKA-1667 https://issues.apache.org/jira/browse/KAFKA-1667 Repository: kafka Description --- KAFKA-1667 Fixed bugs in LogConfig. Added test and documentation KAFKA-1667 Updated tests to reflect new boolean property parsing logic KAFKA-1667 renamed methods to match naming convention KAFKA-1667 Added unit test to cover invalid configuration case KAFKA-1667 Strict UncleanLeaderElection property parsing Diffs - clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java c4cea2cc072f4db4ce014b63d226431d3766bef1 core/src/main/scala/kafka/admin/TopicCommand.scala 0b2735e7fc42ef9894bef1997b1f06a8ebee5439 core/src/main/scala/kafka/log/LogConfig.scala e48922a97727dd0b98f3ae630ebb0af3bef2373d core/src/main/scala/kafka/utils/Utils.scala 23aefb4715b177feae1d2f83e8b910653ea10c5f core/src/test/scala/kafka/log/LogConfigTest.scala PRE-CREATION core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala f44568cb25edf25db857415119018fd4c9922f61 Diff: https://reviews.apache.org/r/27634/diff/ Testing --- Thanks, Dmytro Kostiuchenko
[jira] [Resolved] (KAFKA-1766) Ecosystem docs subsection has wrong anchor
[ https://issues.apache.org/jira/browse/KAFKA-1766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1766. Resolution: Fixed Fix Version/s: 0.8.2 Thanks for pointing this out. Fix the site. Ecosystem docs subsection has wrong anchor -- Key: KAFKA-1766 URL: https://issues.apache.org/jira/browse/KAFKA-1766 Project: Kafka Issue Type: Bug Reporter: Kirill Zaborsky Priority: Minor Fix For: 0.8.2 the following portion of html at http://kafka.apache.org/documentation.html seems to be wrong: h3a id=upgrade1.4 Ecosystem/a/h3 it should be h3a id=ecosystem1.4 Ecosystem/a/h3 Why don't you have Kafka docs in github also? If you had it would be trivial to create a PR to fix this issue -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-493) High CPU usage on inactive server
[ https://issues.apache.org/jira/browse/KAFKA-493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14213272#comment-14213272 ] Jun Rao commented on KAFKA-493: --- [~activars], how do I look at the snapshot? Could you include the top few methods? High CPU usage on inactive server - Key: KAFKA-493 URL: https://issues.apache.org/jira/browse/KAFKA-493 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.0 Reporter: Jay Kreps Fix For: 0.9.0 Attachments: Kafka-2014-11-10.snapshot.zip, Kafka-sampling1.zip, Kafka-sampling2.zip, Kafka-sampling3.zip, Kafka-trace1.zip, Kafka-trace2.zip, Kafka-trace3.zip, backtraces.txt, stacktrace.txt I've been playing with the 0.8 branch of Kafka and noticed that idle CPU usage is fairly high (13% of a core). Is that to be expected? I did look at the stack, but didn't see anything obvious. A background task? I wanted to mention how I am getting into this state. I've set up two machines with the latest 0.8 code base and am using a replication factor of 2. On starting the brokers there is no idle CPU activity. Then I run a test that essential does 10k publish operations followed by immediate consume operations (I was measuring latency). Once this has run the kafka nodes seem to consistently be consuming CPU essentially forever. hprof results: THREAD START (obj=53ae, id = 24, name=RMI TCP Accept-0, group=system) THREAD START (obj=53ae, id = 25, name=RMI TCP Accept-, group=system) THREAD START (obj=53ae, id = 26, name=RMI TCP Accept-0, group=system) THREAD START (obj=53ae, id = 21, name=main, group=main) THREAD START (obj=53ae, id = 27, name=Thread-2, group=main) THREAD START (obj=53ae, id = 28, name=Thread-3, group=main) THREAD START (obj=53ae, id = 29, name=kafka-processor-9092-0, group=main) THREAD START (obj=53ae, id = 200010, name=kafka-processor-9092-1, group=main) THREAD START (obj=53ae, id = 200011, name=kafka-acceptor, group=main) THREAD START (obj=574b, id = 200012, name=ZkClient-EventThread-20-localhost:2181, group=main) THREAD START (obj=576e, id = 200014, name=main-SendThread(), group=main) THREAD START (obj=576d, id = 200013, name=main-EventThread, group=main) THREAD START (obj=53ae, id = 200015, name=metrics-meter-tick-thread-1, group=main) THREAD START (obj=53ae, id = 200016, name=metrics-meter-tick-thread-2, group=main) THREAD START (obj=53ae, id = 200017, name=request-expiration-task, group=main) THREAD START (obj=53ae, id = 200018, name=request-expiration-task, group=main) THREAD START (obj=53ae, id = 200019, name=kafka-request-handler-0, group=main) THREAD START (obj=53ae, id = 200020, name=kafka-request-handler-1, group=main) THREAD START (obj=53ae, id = 200021, name=Thread-6, group=main) THREAD START (obj=53ae, id = 200022, name=Thread-7, group=main) THREAD START (obj=5899, id = 200023, name=ReplicaFetcherThread-0-2 on broker 1, , group=main) THREAD START (obj=5899, id = 200024, name=ReplicaFetcherThread-0-3 on broker 1, , group=main) THREAD START (obj=5899, id = 200025, name=ReplicaFetcherThread-0-0 on broker 1, , group=main) THREAD START (obj=5899, id = 200026, name=ReplicaFetcherThread-0-1 on broker 1, , group=main) THREAD START (obj=53ae, id = 200028, name=SIGINT handler, group=system) THREAD START (obj=53ae, id = 200029, name=Thread-5, group=main) THREAD START (obj=574b, id = 200030, name=Thread-1, group=main) THREAD START (obj=574b, id = 200031, name=Thread-0, group=main) THREAD END (id = 200031) THREAD END (id = 200029) THREAD END (id = 200020) THREAD END (id = 200019) THREAD END (id = 28) THREAD END (id = 200021) THREAD END (id = 27) THREAD END (id = 200022) THREAD END (id = 200018) THREAD END (id = 200017) THREAD END (id = 200012) THREAD END (id = 200013) THREAD END (id = 200014) THREAD END (id = 200025) THREAD END (id = 200023) THREAD END (id = 200026) THREAD END (id = 200024) THREAD END (id = 200011) THREAD END (id = 29) THREAD END (id = 200010) THREAD END (id = 200030) THREAD END (id = 200028) TRACE 301281: sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:Unknown line) sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:228) sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:81) sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:218) sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
[jira] [Commented] (KAFKA-1757) Can not delete Topic index on Windows
[ https://issues.apache.org/jira/browse/KAFKA-1757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14213284#comment-14213284 ] Jun Rao commented on KAFKA-1757: Thanks for the patch. It's not clear to me if this really fixes the problem. OffsetIndex.close() doesn't really close any channels or handlers. It simply remaps the memory mapped file. Can not delete Topic index on Windows - Key: KAFKA-1757 URL: https://issues.apache.org/jira/browse/KAFKA-1757 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.2 Reporter: Lukáš Vyhlídka Priority: Minor Fix For: 0.8.2 Attachments: lucky-v.patch When running the Kafka 0.8.2-Beta (Scala 2.10) on Windows, an attempt to delete the Topic throwed an error: ERROR [KafkaApi-1] error when handling request Name: StopReplicaRequest; Version: 0; CorrelationId: 38; ClientId: ; DeletePartitions: true; ControllerId: 0; ControllerEpoch: 3; Partitions: [test,0] (kafka.server.KafkaApis) kafka.common.KafkaStorageException: Delete of index .index failed. at kafka.log.LogSegment.delete(LogSegment.scala:283) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at kafka.log.Log$$anonfun$delete$1.apply(Log.scala:608) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.Log.delete(Log.scala:608) at kafka.log.LogManager.deleteLog(LogManager.scala:375) at kafka.cluster.Partition$$anonfun$delete$1.apply$mcV$sp(Partition.scala:144) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.cluster.Partition$$anonfun$delete$1.apply(Partition.scala:139) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inWriteLock(Utils.scala:543) at kafka.cluster.Partition.delete(Partition.scala:139) at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:158) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:191) at kafka.server.ReplicaManager$$anonfun$stopReplicas$3.apply(ReplicaManager.scala:190) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:190) at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:96) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:744) When I have investigated the issue I figured out that the index file (in my environment it was C:\tmp\kafka-logs\----0014-0\.index) was locked by the kafka process and the OS did not allow to delete that file. I tried to fix the problem in source codes and when I added close() method call into LogSegment.delete(), the Topic deletion started to work. I will add here (not sure how to upload the file during issue creation) a diff with the changes I have made so You can take a look on that whether it is reasonable or not. It would be perfect if it could make it into the product... In the end I would like to say that on Linux the deletion works just fine... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (KAFKA-1751) handle broker not exists and topic not exists scenarios
[ https://issues.apache.org/jira/browse/KAFKA-1751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein reopened KAFKA-1751: -- handle broker not exists and topic not exists scenarios --- Key: KAFKA-1751 URL: https://issues.apache.org/jira/browse/KAFKA-1751 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 Attachments: KAFKA-1751.patch, kafka-1751.patch merged with 1750 to pass by single code review process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1751) handle broker not exists and topic not exists scenarios
[ https://issues.apache.org/jira/browse/KAFKA-1751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1751: - Status: Patch Available (was: Reopened) handle broker not exists and topic not exists scenarios --- Key: KAFKA-1751 URL: https://issues.apache.org/jira/browse/KAFKA-1751 Project: Kafka Issue Type: Sub-task Components: tools Reporter: Dmitry Pekar Assignee: Dmitry Pekar Fix For: 0.8.3 Attachments: KAFKA-1751.patch, kafka-1751.patch merged with 1750 to pass by single code review process -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1772) Add an Admin message type for request response
Joe Stein created KAFKA-1772: Summary: Add an Admin message type for request response Key: KAFKA-1772 URL: https://issues.apache.org/jira/browse/KAFKA-1772 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein - timestamp - utility int8 - command int8 - format int8 - args variable length bytes utility 0 - Broker 1 - Topic 2 - Replication 3 - Controller 4 - Consumer 5 - Producer Command 0 - Create 1 - Alter 3 - Delete 4 - List 5 - Audit format 0 - JSON args e.g. (which would equate to the data structure values == 2,1,0) meta-store: { {zookeeper:localhost:12913/kafka} }args: { partitions: [ {topic: topic1, partition: 0}, {topic: topic1, partition: 1}, {topic: topic1, partition: 2}, {topic: topic2, partition: 0}, {topic: topic2, partition: 1}, ] } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14213409#comment-14213409 ] Otis Gospodnetic commented on KAFKA-1481: - bq. 80.4 We will need to add the same version mbean in the new java client. We don't need to do that in this jira. Could you file a separate jira to track that? I created KAFKA-1768 a few days ago and have linked it to this issue. Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_2014-10-30_21-35-43.patch, KAFKA-1481_2014-10-31_14-35-43.patch, KAFKA-1481_2014-11-03_16-39-41_doc.patch, KAFKA-1481_2014-11-03_17-02-23.patch, KAFKA-1481_2014-11-10_20-39-41_doc.patch, KAFKA-1481_2014-11-10_21-02-23.patch, KAFKA-1481_2014-11-14_16-33-03.patch, KAFKA-1481_2014-11-14_16-39-41_doc.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch, alternateLayout1.png, alternateLayout2.png, diff-for-alternate-layout1.patch, diff-for-alternate-layout2.patch, originalLayout.png MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)