[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector
[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14176624#comment-14176624 ] Guozhang Wang commented on KAFKA-1716: -- This may be resolved in KAFKA-1305. Did you produce this issue from trunk? hang during shutdown of ZookeeperConsumerConnector -- Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code}-- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code}-- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/utils/Utils$.inLock(Utils.scala:538) at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
Re: [DISCUSSION] Message Metadata
Thanks for the detailed comments Jun! Some replies inlined. On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote: Hi, Guozhang, Thanks for the writeup. A few high level comments. 1. Associating (versioned) schemas to a topic can be a good thing overall. Yes, this could add a bit more management overhead in Kafka. However, it makes sure that the data format contract between a producer and a consumer is kept and managed in a central place, instead of in the application. The latter is probably easier to start with, but is likely to be brittle in the long run. I am actually not proposing to not support associated versioned schemas for topics, but to not let some core Kafka functionalities like auditing being depend on schemas. I think this alone can separate the schema management from Kafka piping management (i.e. making sure every single message is delivered, and within some latency, etc). Adding additional auditing info into an existing schema will force Kafka to be aware of the schema systems (Avro, JSON, etc). 2. Auditing can be a general feature that's useful for many applications. Such a feature can be implemented by extending the low level message format with a header. However, it can also be added as part of the schema management. For example, you can imagine a type of audited schema that adds additional auditing info to an existing schema automatically. Performance wise, it probably doesn't make a big difference whether the auditing info is added in the message header or the schema header. See replies above. 3. We talked about avoiding the overhead of decompressing in both the broker and the mirror maker. We probably need to think through how this works with auditing. In the more general case where you want to audit every message, one has to do the decompression to get the individual message, independent of how the auditing info is stored. This means that if we want to audit the broker directly or the consumer in mirror maker, we have to pay the decompression cost anyway. Similarly, if we want to extend mirror maker to support some customized filtering/transformation logic, we also have to pay the decompression cost. I see your point. For that I would prefer to have a MM implementation that is able to do de-compress / re-compress ONLY if required, for example by auditing, etc. I agree that we have not thought through whether we should enable auditing on MM, and if yes how to do that, and we could discuss about that in a different thread. Overall, this proposal is not just for tackling de-compression on MM but about the feasibility of extending Kafka message header for system properties / app properties. Some low level comments. 4. Broker offset reassignment (kafka-527): This probably can be done with just a format change on the compressed message set. That is true. As I mentioned in the wiki each of the problems may be resolvable separately but I am thinking about a general way to get all of them. 5. MirrorMaker refactoring: We probably can think through how general we want mirror maker to be. If we want to it to be more general, we likely need to decompress every message just like in a normal consumer. There will definitely be overhead. However, as long as mirror maker is made scalable, we can overcome the overhead by just running more instances on more hardware resources. As for the proposed message format change, we probably need to think through it a bit more. The honor-ship flag seems a bit hacky to me. Replied as part of 3). Sure we can discuss more about that, will update the wiki for collected comments. 6. Adding a timestamp in each message can be a useful thing. It (1) allows log segments to be rolled more accurately; (2) allows finding an offset for a particular timestamp more accurately. I am thinking that the timestamp in the message should probably be the time when the leader receives the message. Followers preserve the timestamp set by leader. To avoid time going back during leader change, the leader can probably set the timestamp to be the max of current time and the timestamp of the last message, if present. That timestamp can potentially be added to the index file to answer offsetBeforeTimestamp queries more efficiently. Agreed. 7. Log compaction: It seems that you are suggesting an improvement to compact the active segment as well. This can be tricky and we need to figure out the details on how to do this. This improvement seems to be orthogonal to the message format change though. I think the improvements is more effective with the timestamps as in 6), we can discuss more about this. 8. Data inconsistency from unclean election: I am not sure if we need to add a controlled message to the log during leadership change. The leader generation, starting offset map can be maintained in a separate checkpoint file. The follower just need to get that map from the leader during startup.
[jira] [Updated] (KAFKA-1712) Excessive storage usage on newly added node
[ https://issues.apache.org/jira/browse/KAFKA-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Golovin updated KAFKA-1712: Description: When a new node is added to cluster data starts replicating into it. The mtime of creating segments will be set on the last message being written to them. Though the replication is a prolonged process, let's assume (for simplicity of explanation) that their mtime is very close to the time when the new node was added. After the replication is done, new data will start to flow into this new node. After `log.retention.hours` the amount of data will be 2 * daily_amount_of_data_in_kafka_node (first one is the replicated data from other nodes when the node was added (let us call it `t1`) and the second is the amount of replicated data from other nodes during `t1 + log.retention.hours`). So by that time the node will have twice as much data as the other nodes. This poses a big problem to us as our storage is chosen to fit normal amount of data (not twice this amount). In our particular case it poses another problem. We have an emergency segment cleaner which runs in case storage is nearly full (90%). We try to balance the amount of data for it not to run to rely solely on kafka internal log deletion, but sometimes emergency cleaner runs. It works this way: - it gets all kafka segments for the volume - it filters out last segments of each partition (just to avoid unnecessary recreation of last small-size segments) - it sorts them by segment mtime - it changes mtime of the first N segements (with the lowest mtime) to 1, so they become really really old. Number N is chosen to free specified percentage of volume (3% in our case). Kafka deletes these segments later (as they are very old). Emergency cleaner works very well. Except for the case when the data is replicated to the newly added node. In this case segment mtime is the time the segment was replicated and does not reflect the real creation time of original data stored in this segment. So in this case kafka emergency cleaner will delete segments with the lowest mtime, which may hold the data which is much more recent than the data in other segments. This is not a big problem until we delete the data which hasn't been fully consumed. In this case we loose data and this makes it a big problem. Is it possible to retain segment mtime during initial replication on a new node? This will help not to load the new node with the twice as large amount of data as other nodes have. Or maybe there are another ways to sort segments by data creation times (or close to data creation time)? (for example if this ticket is implemented https://issues.apache.org/jira/browse/KAFKA-1403, we may take time of the first message from .index). In our case it will help with kafka emergency cleaner, which will be deleting really the oldest data. was: When a new node is added to cluster data stars replicating into it. The mtime of creating segments will be set on the last message being written to them. Though the replication is a prolonged process, let's assume (for simplicity of explanation) that their mtime is very close to the time when the new node was added. After the replication is done, new data will start to flow into this new node. After `log.retention.hours` the amount of data will be 2 * daily_amount_of_data_in_kafka_node (first one is the replicated data from other nodes when the node was added (let us call it `t1`) and the second is the amount of replicated data from other nodes during `t1 + log.retention.hours`). So by that time the node will have twice as much data as the other nodes. This poses a big problem to us as our storage is chosen to fit normal amount of data (not twice this amount). In our particular case it poses another problem. We have an emergency segment cleaner which runs in case storage is nearly full (90%). We try to balance the amount of data for it not to run to rely solely on kafka internal log deletion, but sometimes emergency cleaner runs. It works this way: - it gets all kafka segments for the volume - it filters out last segments of each partition (just to avoid unnecessary recreation of last small-size segments) - it sorts them by segment mtime - it changes mtime of the first N segements (with the lowest mtime) to 1, so they become really really old. Number N is chosen to free specified percentage of volume (3% in our case). Kafka deletes these segments later (as they are very old). Emergency cleaner works very well. Except for the case when the data is replicated to the newly added node. In this case segment mtime is the time the segment was replicated and does not reflect the real creation time of original data stored in this segment. So in this case kafka emergency cleaner will delete segments with the lowest mtime, which may hold the data which is much more recent than the
[jira] [Updated] (KAFKA-1712) Excessive storage usage on newly added node
[ https://issues.apache.org/jira/browse/KAFKA-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Golovin updated KAFKA-1712: Description: When a new node is added to cluster data starts replicating into it. The mtime of creating segments will be set on the last message being written to them. Though the replication is a prolonged process, let's assume (for simplicity of explanation) that their mtime is very close to the time when the new node was added. After the replication is done, new data will start to flow into this new node. After `log.retention.hours` the amount of data will be 2 * daily_amount_of_data_in_kafka_node (first one is the replicated data from other nodes when the node was added (let us call it `t1`) and the second is the amount of replicated data from other nodes which happened from `t1` to `t1 + log.retention.hours`). So by that time the node will have twice as much data as the other nodes. This poses a big problem to us as our storage is chosen to fit normal amount of data (not twice this amount). In our particular case it poses another problem. We have an emergency segment cleaner which runs in case storage is nearly full (90%). We try to balance the amount of data for it not to run to rely solely on kafka internal log deletion, but sometimes emergency cleaner runs. It works this way: - it gets all kafka segments for the volume - it filters out last segments of each partition (just to avoid unnecessary recreation of last small-size segments) - it sorts them by segment mtime - it changes mtime of the first N segements (with the lowest mtime) to 1, so they become really really old. Number N is chosen to free specified percentage of volume (3% in our case). Kafka deletes these segments later (as they are very old). Emergency cleaner works very well. Except for the case when the data is replicated to the newly added node. In this case segment mtime is the time the segment was replicated and does not reflect the real creation time of original data stored in this segment. So in this case kafka emergency cleaner will delete segments with the lowest mtime, which may hold the data which is much more recent than the data in other segments. This is not a big problem until we delete the data which hasn't been fully consumed. In this case we loose data and this makes it a big problem. Is it possible to retain segment mtime during initial replication on a new node? This will help not to load the new node with the twice as large amount of data as other nodes have. Or maybe there are another ways to sort segments by data creation times (or close to data creation time)? (for example if this ticket is implemented https://issues.apache.org/jira/browse/KAFKA-1403, we may take time of the first message from .index). In our case it will help with kafka emergency cleaner, which will be deleting really the oldest data. was: When a new node is added to cluster data starts replicating into it. The mtime of creating segments will be set on the last message being written to them. Though the replication is a prolonged process, let's assume (for simplicity of explanation) that their mtime is very close to the time when the new node was added. After the replication is done, new data will start to flow into this new node. After `log.retention.hours` the amount of data will be 2 * daily_amount_of_data_in_kafka_node (first one is the replicated data from other nodes when the node was added (let us call it `t1`) and the second is the amount of replicated data from other nodes during `t1 + log.retention.hours`). So by that time the node will have twice as much data as the other nodes. This poses a big problem to us as our storage is chosen to fit normal amount of data (not twice this amount). In our particular case it poses another problem. We have an emergency segment cleaner which runs in case storage is nearly full (90%). We try to balance the amount of data for it not to run to rely solely on kafka internal log deletion, but sometimes emergency cleaner runs. It works this way: - it gets all kafka segments for the volume - it filters out last segments of each partition (just to avoid unnecessary recreation of last small-size segments) - it sorts them by segment mtime - it changes mtime of the first N segements (with the lowest mtime) to 1, so they become really really old. Number N is chosen to free specified percentage of volume (3% in our case). Kafka deletes these segments later (as they are very old). Emergency cleaner works very well. Except for the case when the data is replicated to the newly added node. In this case segment mtime is the time the segment was replicated and does not reflect the real creation time of original data stored in this segment. So in this case kafka emergency cleaner will delete segments with the lowest mtime, which may hold the data which is much
[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector
[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14176893#comment-14176893 ] Sean Fay commented on KAFKA-1716: - Nope, this was running 0.8.1.1. I'm not very familiar with the code. But it looks like KAFKA-1305 changed {{controller.message.queue.size}}, while the code in question here is impacted by {{queued.max.message.chunks}} hang during shutdown of ZookeeperConsumerConnector -- Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code}-- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code}-- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/utils/Utils$.inLock(Utils.scala:538) at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at
[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector
[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14176904#comment-14176904 ] Sriharsha Chintalapani commented on KAFKA-1716: --- [~sfay] do you have any steps to reproduce this. hang during shutdown of ZookeeperConsumerConnector -- Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code}-- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code}-- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/utils/Utils$.inLock(Utils.scala:538) at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51) at
[jira] [Updated] (KAFKA-1171) Gradle build for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-1171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1171: - Fix Version/s: 0.8.1 0.8.1.1 Gradle build for Kafka -- Key: KAFKA-1171 URL: https://issues.apache.org/jira/browse/KAFKA-1171 Project: Kafka Issue Type: Improvement Components: packaging Affects Versions: 0.8.1, 0.9.0 Reporter: David Arthur Assignee: David Arthur Priority: Blocker Fix For: 0.8.1, 0.8.1.1 Attachments: 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, kafka-1171_v10.patch, kafka-1171_v11.patch, kafka-1171_v12.patch, kafka-1171_v13.patch, kafka-1171_v14.patch, kafka-1171_v15.patch, kafka-1171_v6.patch, kafka-1171_v7.patch, kafka-1171_v8.patch, kafka-1171_v9.patch We have previously discussed moving away from SBT to an easier-to-comprehend-and-debug build system such as Ant or Gradle. I put up a patch for an Ant+Ivy build a while ago[1], and it sounded like people wanted to check out Gradle as well. 1. https://issues.apache.org/jira/browse/KAFKA-855 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (KAFKA-262) Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own
[ https://issues.apache.org/jira/browse/KAFKA-262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede closed KAFKA-262. --- Bug in the consumer rebalancing logic causes one consumer to release partitions that it does not own Key: KAFKA-262 URL: https://issues.apache.org/jira/browse/KAFKA-262 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7 Reporter: Neha Narkhede Assignee: Neha Narkhede Fix For: 0.7.1 Attachments: kafka-262-v3.patch, kafka-262.patch Original Estimate: 24h Remaining Estimate: 24h The consumer maintains a cache of topics and partitions it owns along with the fetcher queues corresponding to those. But while releasing partition ownership, this cache is not cleared. This leads the consumer to release a partition that it does not own any more. This can also lead the consumer to commit offsets for partitions that it no longer consumes from. The rebalance operation goes through following steps - 1. close fetchers 2. commit offsets 3. release partition ownership. 4. rebalance, add topic, partition and fetcher queues to the topic registry, for all topics that the consumer process currently wants to own. 5. If the consumer runs into conflict for one topic or partition, the rebalancing attempt fails, and it goes to step 1. Say, there are 2 consumers in a group, c1 and c2. Both are consuming topic1 with partitions 0-0, 0-1 and 1-0. Say c1 owns 0-0 and 0-1 and c2 owns 1-0. 1. Broker 1 goes down. This triggers rebalancing attempt in c1 and c2. 2. c1's release partition ownership and during step 4 (above), fails to rebalance. 3. Meanwhile, c2 completes rebalancing successfully, and owns partition 0-1 and starts consuming data. 4. c1 starts next rebalancing attempt and during step 3 (above), it releases partition 0-1. During step 4, it owns partition 0-0 again, and starts consuming data. 5. Effectively, rebalancing has completed successfully, but there is no owner for partition 0-1 registered in Zookeeper. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (KAFKA-1171) Gradle build for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-1171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede closed KAFKA-1171. Gradle build for Kafka -- Key: KAFKA-1171 URL: https://issues.apache.org/jira/browse/KAFKA-1171 Project: Kafka Issue Type: Improvement Components: packaging Affects Versions: 0.8.1, 0.9.0 Reporter: David Arthur Assignee: David Arthur Priority: Blocker Fix For: 0.8.1, 0.8.1.1 Attachments: 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, 0001-Adding-basic-Gradle-build.patch, kafka-1171_v10.patch, kafka-1171_v11.patch, kafka-1171_v12.patch, kafka-1171_v13.patch, kafka-1171_v14.patch, kafka-1171_v15.patch, kafka-1171_v6.patch, kafka-1171_v7.patch, kafka-1171_v8.patch, kafka-1171_v9.patch We have previously discussed moving away from SBT to an easier-to-comprehend-and-debug build system such as Ant or Gradle. I put up a patch for an Ant+Ivy build a while ago[1], and it sounded like people wanted to check out Gradle as well. 1. https://issues.apache.org/jira/browse/KAFKA-855 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1630) ConsumerFetcherThread locked in Tomcat
[ https://issues.apache.org/jira/browse/KAFKA-1630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14176927#comment-14176927 ] Neha Narkhede commented on KAFKA-1630: -- The thread dump you attached doesn't suggest threads getting locked, they are in the RUNNABLE state and consuming data. Just not as fast since the process is probably CPU bound. ConsumerFetcherThread locked in Tomcat -- Key: KAFKA-1630 URL: https://issues.apache.org/jira/browse/KAFKA-1630 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.0 Environment: linux redhat Reporter: vijay Assignee: Neha Narkhede Labels: performance Original Estimate: 12h Remaining Estimate: 12h I am using high level consumer API for consuming messages from kafka. ConsumerFetcherThread gets locked. Kindly look in to the below stack trace ConsumerFetcherThread-SocialTwitterStream6_172.31.240.136-1410398702143-61a247c3-0-1 prio=10 tid=0x7f294001e800 nid=0x1677 runnable [0x7f297aae9000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:215) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69) - locked 0x7f2a7c38eb40 (a sun.nio.ch.Util$1) - locked 0x7f2a7c38eb28 (a java.util.Collections$UnmodifiableSet) - locked 0x7f2a7c326f20 (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80) at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:193) - locked 0x7f2a7c2163c0 (a java.lang.Object) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86) - locked 0x7f2a7c229950 (a sun.nio.ch.SocketAdaptor$SocketInputStream) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:200) - locked 0x7f2a7c38ea50 (a java.lang.Object) at kafka.utils.Utils$.read(Utils.scala:395) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) - locked 0x7f2a7c38e9f0 (a java.lang.Object) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:110) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:110) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1714) more better bootstrapping of the gradle-wrapper.jar
[ https://issues.apache.org/jira/browse/KAFKA-1714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1714: - Component/s: build more better bootstrapping of the gradle-wrapper.jar Key: KAFKA-1714 URL: https://issues.apache.org/jira/browse/KAFKA-1714 Project: Kafka Issue Type: Bug Components: build Reporter: Joe Stein Fix For: 0.8.3 From https://issues.apache.org/jira/browse/KAFKA-1490 we moved out the gradle-wrapper.jar for our source maintenance. This makes builds for folks coming in the first step somewhat problematic. A bootstrap step is required if this could be somehow incorporated that would be great. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1715) better advertising of the bound and working interface
[ https://issues.apache.org/jira/browse/KAFKA-1715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1715: - Labels: newbie (was: ) better advertising of the bound and working interface - Key: KAFKA-1715 URL: https://issues.apache.org/jira/browse/KAFKA-1715 Project: Kafka Issue Type: Bug Reporter: Joe Stein Labels: newbie Fix For: 0.8.3 As part of the auto discovery of brokers and meta data messaging we should try to advertise the interface that is bound and working better. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1714) more better bootstrapping of the gradle-wrapper.jar
[ https://issues.apache.org/jira/browse/KAFKA-1714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1714: - Affects Version/s: 0.8.2 more better bootstrapping of the gradle-wrapper.jar Key: KAFKA-1714 URL: https://issues.apache.org/jira/browse/KAFKA-1714 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2 Reporter: Joe Stein Fix For: 0.8.3 From https://issues.apache.org/jira/browse/KAFKA-1490 we moved out the gradle-wrapper.jar for our source maintenance. This makes builds for folks coming in the first step somewhat problematic. A bootstrap step is required if this could be somehow incorporated that would be great. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector
[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14176944#comment-14176944 ] Neha Narkhede commented on KAFKA-1716: -- [~sfay] Could you please attach the entire thread dump? I'm not sure KAFKA-1305 is related since that was a purely broker side issue and didn't touch the fetcher threads. hang during shutdown of ZookeeperConsumerConnector -- Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code}-- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code}-- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/utils/Utils$.inLock(Utils.scala:538) at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at
[jira] [Created] (KAFKA-1717) remove netty dependency through ZK 3.4.x
Jun Rao created KAFKA-1717: -- Summary: remove netty dependency through ZK 3.4.x Key: KAFKA-1717 URL: https://issues.apache.org/jira/browse/KAFKA-1717 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 ZK 3.4.x specifies a dependency on netty. However, that dependency is optional (ZOOKEEPER-1681). To avoid potential jar conflict, it's better to exclude netty dependency from Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1717) remove netty dependency through ZK 3.4.x
[ https://issues.apache.org/jira/browse/KAFKA-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1717: --- Status: Patch Available (was: Open) remove netty dependency through ZK 3.4.x Key: KAFKA-1717 URL: https://issues.apache.org/jira/browse/KAFKA-1717 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 Attachments: kafka-1717.patch ZK 3.4.x specifies a dependency on netty. However, that dependency is optional (ZOOKEEPER-1681). To avoid potential jar conflict, it's better to exclude netty dependency from Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1717) remove netty dependency through ZK 3.4.x
[ https://issues.apache.org/jira/browse/KAFKA-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1717: --- Attachment: kafka-1717.patch remove netty dependency through ZK 3.4.x Key: KAFKA-1717 URL: https://issues.apache.org/jira/browse/KAFKA-1717 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 Attachments: kafka-1717.patch ZK 3.4.x specifies a dependency on netty. However, that dependency is optional (ZOOKEEPER-1681). To avoid potential jar conflict, it's better to exclude netty dependency from Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1717) remove netty dependency through ZK 3.4.x
[ https://issues.apache.org/jira/browse/KAFKA-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14176965#comment-14176965 ] Jun Rao commented on KAFKA-1717: Created reviewboard https://reviews.apache.org/r/26936/diff/ against branch origin/trunk remove netty dependency through ZK 3.4.x Key: KAFKA-1717 URL: https://issues.apache.org/jira/browse/KAFKA-1717 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 Attachments: kafka-1717.patch ZK 3.4.x specifies a dependency on netty. However, that dependency is optional (ZOOKEEPER-1681). To avoid potential jar conflict, it's better to exclude netty dependency from Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26936: Patch for kafka-1717
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26936/#review57336 --- Ship it! +1 - Sriharsha Chintalapani On Oct. 20, 2014, 3:16 p.m., Jun Rao wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26936/ --- (Updated Oct. 20, 2014, 3:16 p.m.) Review request for kafka. Bugs: kafka-1717 https://issues.apache.org/jira/browse/kafka-1717 Repository: kafka Description --- remove netty from kafka dependency manually until ZOOKEEPER-1681 is fixed Diffs - build.gradle ee87e0f40ad05ba07763d7fe8a4fe3aaadf524e5 Diff: https://reviews.apache.org/r/26936/diff/ Testing --- Thanks, Jun Rao
[jira] [Created] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
Evan Huus created KAFKA-1718: Summary: Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] Message Metadata
8. Perhaps I need to see the details of the alternative solution. During the startup of a follower, in general, it's not enough for the follower to just see the latest generation of the current leader, since the follower can be several generations behind. So, if the controlled message only contains the latest generation, it may not be not enough for the follower to resolve the discrepancy with the leader. Another thing is that when a follower starts up, the first thing it needs to do is to figure out how much to truncate. This needs to happen before the follower can start fetching. So, there is also a chicken and egg problem. The follower can't figure out how much to truncate before it can fetch the control message. However, it doesn't know where to start fetching until the truncation is properly done. Thanks, Jun On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks for the detailed comments Jun! Some replies inlined. On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote: Hi, Guozhang, Thanks for the writeup. A few high level comments. 1. Associating (versioned) schemas to a topic can be a good thing overall. Yes, this could add a bit more management overhead in Kafka. However, it makes sure that the data format contract between a producer and a consumer is kept and managed in a central place, instead of in the application. The latter is probably easier to start with, but is likely to be brittle in the long run. I am actually not proposing to not support associated versioned schemas for topics, but to not let some core Kafka functionalities like auditing being depend on schemas. I think this alone can separate the schema management from Kafka piping management (i.e. making sure every single message is delivered, and within some latency, etc). Adding additional auditing info into an existing schema will force Kafka to be aware of the schema systems (Avro, JSON, etc). 2. Auditing can be a general feature that's useful for many applications. Such a feature can be implemented by extending the low level message format with a header. However, it can also be added as part of the schema management. For example, you can imagine a type of audited schema that adds additional auditing info to an existing schema automatically. Performance wise, it probably doesn't make a big difference whether the auditing info is added in the message header or the schema header. See replies above. 3. We talked about avoiding the overhead of decompressing in both the broker and the mirror maker. We probably need to think through how this works with auditing. In the more general case where you want to audit every message, one has to do the decompression to get the individual message, independent of how the auditing info is stored. This means that if we want to audit the broker directly or the consumer in mirror maker, we have to pay the decompression cost anyway. Similarly, if we want to extend mirror maker to support some customized filtering/transformation logic, we also have to pay the decompression cost. I see your point. For that I would prefer to have a MM implementation that is able to do de-compress / re-compress ONLY if required, for example by auditing, etc. I agree that we have not thought through whether we should enable auditing on MM, and if yes how to do that, and we could discuss about that in a different thread. Overall, this proposal is not just for tackling de-compression on MM but about the feasibility of extending Kafka message header for system properties / app properties. Some low level comments. 4. Broker offset reassignment (kafka-527): This probably can be done with just a format change on the compressed message set. That is true. As I mentioned in the wiki each of the problems may be resolvable separately but I am thinking about a general way to get all of them. 5. MirrorMaker refactoring: We probably can think through how general we want mirror maker to be. If we want to it to be more general, we likely need to decompress every message just like in a normal consumer. There will definitely be overhead. However, as long as mirror maker is made scalable, we can overcome the overhead by just running more instances on more hardware resources. As for the proposed message format change, we probably need to think through it a bit more. The honor-ship flag seems a bit hacky to me. Replied as part of 3). Sure we can discuss more about that, will update the wiki for collected comments. 6. Adding a timestamp in each message can be a useful thing. It (1) allows log segments to be rolled more accurately; (2) allows finding an offset for a particular timestamp more accurately. I am thinking that the timestamp in the message should probably be the time when the leader receives the message. Followers preserve the timestamp
Re: [DISCUSSION] Message Metadata
I have updated the wiki page incorporating received comments. We can discuss some more details on: 1. How we want to do audit? Whether we want to have in-built auditing on brokers or even MMs or use an audit consumer to fetch all messages from just brokers. 2. How we can avoid de-/re-compression on brokers and MMs with log compaction turned on. 3. How we can resolve unclean leader election resulted data inconsistency with control messages. Guozhang On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks for the detailed comments Jun! Some replies inlined. On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote: Hi, Guozhang, Thanks for the writeup. A few high level comments. 1. Associating (versioned) schemas to a topic can be a good thing overall. Yes, this could add a bit more management overhead in Kafka. However, it makes sure that the data format contract between a producer and a consumer is kept and managed in a central place, instead of in the application. The latter is probably easier to start with, but is likely to be brittle in the long run. I am actually not proposing to not support associated versioned schemas for topics, but to not let some core Kafka functionalities like auditing being depend on schemas. I think this alone can separate the schema management from Kafka piping management (i.e. making sure every single message is delivered, and within some latency, etc). Adding additional auditing info into an existing schema will force Kafka to be aware of the schema systems (Avro, JSON, etc). 2. Auditing can be a general feature that's useful for many applications. Such a feature can be implemented by extending the low level message format with a header. However, it can also be added as part of the schema management. For example, you can imagine a type of audited schema that adds additional auditing info to an existing schema automatically. Performance wise, it probably doesn't make a big difference whether the auditing info is added in the message header or the schema header. See replies above. 3. We talked about avoiding the overhead of decompressing in both the broker and the mirror maker. We probably need to think through how this works with auditing. In the more general case where you want to audit every message, one has to do the decompression to get the individual message, independent of how the auditing info is stored. This means that if we want to audit the broker directly or the consumer in mirror maker, we have to pay the decompression cost anyway. Similarly, if we want to extend mirror maker to support some customized filtering/transformation logic, we also have to pay the decompression cost. I see your point. For that I would prefer to have a MM implementation that is able to do de-compress / re-compress ONLY if required, for example by auditing, etc. I agree that we have not thought through whether we should enable auditing on MM, and if yes how to do that, and we could discuss about that in a different thread. Overall, this proposal is not just for tackling de-compression on MM but about the feasibility of extending Kafka message header for system properties / app properties. Some low level comments. 4. Broker offset reassignment (kafka-527): This probably can be done with just a format change on the compressed message set. That is true. As I mentioned in the wiki each of the problems may be resolvable separately but I am thinking about a general way to get all of them. 5. MirrorMaker refactoring: We probably can think through how general we want mirror maker to be. If we want to it to be more general, we likely need to decompress every message just like in a normal consumer. There will definitely be overhead. However, as long as mirror maker is made scalable, we can overcome the overhead by just running more instances on more hardware resources. As for the proposed message format change, we probably need to think through it a bit more. The honor-ship flag seems a bit hacky to me. Replied as part of 3). Sure we can discuss more about that, will update the wiki for collected comments. 6. Adding a timestamp in each message can be a useful thing. It (1) allows log segments to be rolled more accurately; (2) allows finding an offset for a particular timestamp more accurately. I am thinking that the timestamp in the message should probably be the time when the leader receives the message. Followers preserve the timestamp set by leader. To avoid time going back during leader change, the leader can probably set the timestamp to be the max of current time and the timestamp of the last message, if present. That timestamp can potentially be added to the index file to answer offsetBeforeTimestamp queries more efficiently. Agreed. 7. Log compaction: It seems that you are suggesting an improvement to compact the active segment as well. This can
Re: Review Request 26936: Patch for kafka-1717
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26936/#review57357 --- Ship it! Ship It! - Neha Narkhede On Oct. 20, 2014, 3:16 p.m., Jun Rao wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26936/ --- (Updated Oct. 20, 2014, 3:16 p.m.) Review request for kafka. Bugs: kafka-1717 https://issues.apache.org/jira/browse/kafka-1717 Repository: kafka Description --- remove netty from kafka dependency manually until ZOOKEEPER-1681 is fixed Diffs - build.gradle ee87e0f40ad05ba07763d7fe8a4fe3aaadf524e5 Diff: https://reviews.apache.org/r/26936/diff/ Testing --- Thanks, Jun Rao
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177107#comment-14177107 ] Sriharsha Chintalapani commented on KAFKA-1718: --- [~eapache] kafka uses ByteBufferMessageSet https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala which adds to each message a LogOverhead https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L54 and in Log compares ByteBufferMessageSet.sizeInBytes with configured message size which won't be equal to actual message size sent. Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177112#comment-14177112 ] Evan Huus commented on KAFKA-1718: -- LogOverhead is only 12 bytes; none of the values I produce are within 12 bytes of the limit and nowhere near the 1070127 that the broker is reporting. Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1719) Make mirror maker exit when one consumer/producer thread exits.
Jiangjie Qin created KAFKA-1719: --- Summary: Make mirror maker exit when one consumer/producer thread exits. Key: KAFKA-1719 URL: https://issues.apache.org/jira/browse/KAFKA-1719 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin When one of the consumer/producer thread exits, the entire mirror maker will be blocked. In this case, it is better to make it exit. It seems a single ZookeeperConsumerConnector is sufficient for mirror maker, probably we don't need a list for the connectors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177149#comment-14177149 ] Vladimir Tretyakov commented on KAFKA-1481: --- Hi, fuf:), created new patch (for 0.8.2 branch). re 2.1, 2.3 - didn't change this part because we use this part in our monitoring, it is more handy for user to see something like 'localhost:9092' instead of '2 (where 2 is broker id)'. For us it is easiest to see/have this information directly in mBean. With this patch mBeanNames look like: {code} kafka.producer:type=ProducerTopicMetrics,name=MessagesPerSec,topic=spm_alerts_topic kafka.producer:type=ProducerTopicMetrics,name=MessagesPerSec,allTopics=true kafka.log:type=Log,name=LogEndOffset,topic=spm_alerts_topic,partitionId=0 kafka.consumer:type=ZookeeperConsumerConnector,name=FetchQueueSize,timestamp=1413817796508,clientId=af_servers,uuid=9f99df40,groupId=af_servers,topicThreadId=spm_topic,consumerHostName=wawanawna,threadId=0 kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=af_servers,threadName=ConsumerFetcherThread,fetcherId=0,sourceBrokerId=0,groupId=af_servers,consumerHostName=wawanawna,timestamp=1413817796508,uuid=9f99df40,brokerHost=wawanawna,brokerPort=9092 {code} There is not everything clear for me in code, so I've tried to avoid deep refactoring. All tests passed for me locally. PS: note that if 'value' part is empty (in key-value structure) we will not see this part in attributes. example: key1-value1,key2-null,key3-value3 will be converted to key1=value1,key3=value3. Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Tretyakov updated KAFKA-1481: -- Attachment: KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1717) remove netty dependency through ZK 3.4.x
[ https://issues.apache.org/jira/browse/KAFKA-1717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1717: --- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the reviews. Committed to trunk and 0.8.2. remove netty dependency through ZK 3.4.x Key: KAFKA-1717 URL: https://issues.apache.org/jira/browse/KAFKA-1717 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Priority: Blocker Fix For: 0.8.2 Attachments: kafka-1717.patch ZK 3.4.x specifies a dependency on netty. However, that dependency is optional (ZOOKEEPER-1681). To avoid potential jar conflict, it's better to exclude netty dependency from Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177250#comment-14177250 ] Jun Rao commented on KAFKA-1481: Thanks for the patch. It doesn't seem to apply. Could you rebase? git apply -p0 ~/Downloads/KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch error: core/src/main/scala/kafka/common/ClientIdTopic.scala: No such file or directory error: patch failed: core/src/main/scala/kafka/server/KafkaApis.scala:285 error: core/src/main/scala/kafka/server/KafkaApis.scala: patch does not apply error: patch failed: core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala:74 error: core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala: patch does not apply error: patch failed: core/src/main/scala/kafka/consumer/PartitionAssignor.scala:101 error: core/src/main/scala/kafka/consumer/PartitionAssignor.scala: patch does not apply error: patch failed: core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala:17 error: core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala: patch does not apply Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177324#comment-14177324 ] Sriharsha Chintalapani commented on KAFKA-1718: --- [~eapache] there is additional data being added per message too https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/Message.scala#L172 and https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/Message.scala#L99 Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[Java New Producer] Snappy NPE Issue
Hi Kafka Dev, I am getting following issue with Snappy Library. I checked code for Snappy lib it seems to be fine. Have you guys seen this issue ? 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Here is code for Snappy http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 : 153 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 *if* (inputBuffer http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer == *null* || (buffer != *null* buffer.length inputBuffer http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer.length http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer)) { Thanks, Bhavesh
[jira] [Comment Edited] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177351#comment-14177351 ] Evan Huus edited comment on KAFKA-1718 at 10/20/14 7:28 PM: That additional data is only 26 bytes, and is already included in the numbers I put in my original report. was (Author: eapache): The numbers I put in my original report do take all of that additional data into consideration already. Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177423#comment-14177423 ] Vladimir Tretyakov commented on KAFKA-1481: --- Hi, thx for quick reply, did rebase, added new patches (1 was created by git, 1 by IDEA IDE). Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1555: Attachment: KAFKA-1555-DOCS.0.patch Adding documentation patch. Note that this patch is huge since it added an all-new 0.8.2 documentation. The relevant parts are: 1. Added sub-secution on Availability and Durability Guarantees under 4.7 Replication section, explaining the different knobs and trade-offs involved. 2. Added min.isr to topic configs provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Tretyakov updated KAFKA-1481: -- Attachment: KAFKA-1481_2014-10-20_23-14-35.patch KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177478#comment-14177478 ] Jun Rao commented on KAFKA-1718: Another thing that we do on the broker is to assign a new offset to each (uncompressed) message and recompress those messages again. It may be possible that because of the newly assigned offsets, the new message set doesn't compress as well as before and thus exceeds the limit. Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177487#comment-14177487 ] Evan Huus commented on KAFKA-1718: -- That sounds plausible. 1. How do I verify if that is/isn't the problem I'm seeing? Is there some piece of backtrace or breakpoint I can check or something? 2. If that is the problem, what is a client supposed to do about it? Leave a few KiB spare and hope that that's enough? Is there no way for a client using compression to be sure that the broker will actually accept the payload (unless presumably the uncompressed payload is already small enough)? Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177487#comment-14177487 ] Evan Huus edited comment on KAFKA-1718 at 10/20/14 9:23 PM: That sounds plausible. 1. How do I verify if that is/isn't the problem I'm seeing? Is there some piece of backtrace or breakpoint I can check or something? 2. If that is the problem, what is a client supposed to do about it? Leave a few KiB spare and hope that that's enough? Is there no way for a client using compression to be sure that the broker will actually accept the payload (unless presumably the uncompressed payload is already small enough)? Edit: actually, that can't be it. From my original report When uncompressed, each message contains a message set of 999600 bytes. So unless the recompression on the broker's end *added* a substantial amount of data (which is improbable; the messages were all 0s)... was (Author: eapache): That sounds plausible. 1. How do I verify if that is/isn't the problem I'm seeing? Is there some piece of backtrace or breakpoint I can check or something? 2. If that is the problem, what is a client supposed to do about it? Leave a few KiB spare and hope that that's enough? Is there no way for a client using compression to be sure that the broker will actually accept the payload (unless presumably the uncompressed payload is already small enough)? Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following
[jira] [Commented] (KAFKA-1712) Excessive storage usage on newly added node
[ https://issues.apache.org/jira/browse/KAFKA-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177500#comment-14177500 ] Jun Rao commented on KAFKA-1712: This is being discussed in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata Excessive storage usage on newly added node --- Key: KAFKA-1712 URL: https://issues.apache.org/jira/browse/KAFKA-1712 Project: Kafka Issue Type: Bug Reporter: Oleg Golovin When a new node is added to cluster data starts replicating into it. The mtime of creating segments will be set on the last message being written to them. Though the replication is a prolonged process, let's assume (for simplicity of explanation) that their mtime is very close to the time when the new node was added. After the replication is done, new data will start to flow into this new node. After `log.retention.hours` the amount of data will be 2 * daily_amount_of_data_in_kafka_node (first one is the replicated data from other nodes when the node was added (let us call it `t1`) and the second is the amount of replicated data from other nodes which happened from `t1` to `t1 + log.retention.hours`). So by that time the node will have twice as much data as the other nodes. This poses a big problem to us as our storage is chosen to fit normal amount of data (not twice this amount). In our particular case it poses another problem. We have an emergency segment cleaner which runs in case storage is nearly full (90%). We try to balance the amount of data for it not to run to rely solely on kafka internal log deletion, but sometimes emergency cleaner runs. It works this way: - it gets all kafka segments for the volume - it filters out last segments of each partition (just to avoid unnecessary recreation of last small-size segments) - it sorts them by segment mtime - it changes mtime of the first N segements (with the lowest mtime) to 1, so they become really really old. Number N is chosen to free specified percentage of volume (3% in our case). Kafka deletes these segments later (as they are very old). Emergency cleaner works very well. Except for the case when the data is replicated to the newly added node. In this case segment mtime is the time the segment was replicated and does not reflect the real creation time of original data stored in this segment. So in this case kafka emergency cleaner will delete segments with the lowest mtime, which may hold the data which is much more recent than the data in other segments. This is not a big problem until we delete the data which hasn't been fully consumed. In this case we loose data and this makes it a big problem. Is it possible to retain segment mtime during initial replication on a new node? This will help not to load the new node with the twice as large amount of data as other nodes have. Or maybe there are another ways to sort segments by data creation times (or close to data creation time)? (for example if this ticket is implemented https://issues.apache.org/jira/browse/KAFKA-1403, we may take time of the first message from .index). In our case it will help with kafka emergency cleaner, which will be deleting really the oldest data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1514) Update Kafka trunk version number
[ https://issues.apache.org/jira/browse/KAFKA-1514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1514. Resolution: Fixed Update Kafka trunk version number - Key: KAFKA-1514 URL: https://issues.apache.org/jira/browse/KAFKA-1514 Project: Kafka Issue Type: Bug Reporter: Jakob Homan Right now the 8.1.1 branch has 8.1.1 as its version, while the trunk has 8.1. Trunk should be 9.0, generally, or at the very least 8.2. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1720) [Renaming / Comments] Delayed Operations
Guozhang Wang created KAFKA-1720: Summary: [Renaming / Comments] Delayed Operations Key: KAFKA-1720 URL: https://issues.apache.org/jira/browse/KAFKA-1720 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 After KAFKA-1583 checked in, we would better renaming the delayed requests to delayed operations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177558#comment-14177558 ] Jun Rao commented on KAFKA-1718: In trunk, we actually do a size check before and after recompression. You can probably set a breakpoint in Log.append() and see where the size limit is violated. Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26373: Patch for KAFKA-1647
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/#review57490 --- Thanks for the patch. Some comments below. core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/26373/#comment98208 This doesn't quite fix the original problem though. The original problem is that if the leader is not alive, we won't call partition.makeFollower(), in which the local replica is created. If a local replica is not created, the partition will be ignored when checkingpoint HW and we lose the last checkpointed HW. So, we have to call partition.makerFollower() for every follower, whether its leader is live or not. After this, we can proceed with the rest of the steps for only those partitions with a live leader. We can log a warning for those partitions w/o a live leader. - Jun Rao On Oct. 18, 2014, 7:26 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/ --- (Updated Oct. 18, 2014, 7:26 a.m.) Review request for kafka. Bugs: KAFKA-1647 https://issues.apache.org/jira/browse/KAFKA-1647 Repository: kafka Description --- Addressed Joel's comments. the version 2 code seems to be submitted by mistake... This should be the code for review that addressed Joel's comments. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 Diff: https://reviews.apache.org/r/26373/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 26885: Patch for KAFKA-1642
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 21, 2014, 12:34 a.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description (updated) --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1642: - Attachment: KAFKA-1642_2014-10-20_17:33:57.patch [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177717#comment-14177717 ] Jun Rao commented on KAFKA-1481: It still doesn't apply. git apply -p0 ~/Downloads/KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch /Users/junrao/Downloads/KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch:311: trailing whitespace. }) error: core/src/main/scala/kafka/common/TopicInfo.scala: No such file or directory Does the patch apply on a fresh checkout for you? Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177723#comment-14177723 ] Ewen Cheslack-Postava commented on KAFKA-1642: -- To summarize the issues fixed now: * Fix logic issue with expired in RecordAccumulator.ready * Don't include nodes that can send data when computing the delay until the next check for ready data. Including these doesn't make sense since their delays will change when we send data. * To correctly account for nodes with sendable data, use a timeout of 0 if we send any. This guarantees any necessary delay is computed immediately in the next round after some current data has been removed. * Properly account for nodes with sendable data under connection retry backoff. Since they weren't included in computing the next check delay when looking up ready nodes, we need to account for it later, but only if we conclude the node isn't ready. We need to incorporate the amount of backoff time still required before a retry will be performed (nothing else would wakeup at the right time, unlike other conditions like a full buffer which only change if data is received). It might be possible to break this into smaller commits for each one, but the ordering of applying them needs to be careful because some by themselves result in bad behavior -- the existing client worked because it often ended up with poll timeouts that were much more aggressive (i.e., often 0). [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1698) Validator.ensureValid() only validates default config value
[ https://issues.apache.org/jira/browse/KAFKA-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1698: --- Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patch. +1 and committed to trunk. Validator.ensureValid() only validates default config value --- Key: KAFKA-1698 URL: https://issues.apache.org/jira/browse/KAFKA-1698 Project: Kafka Issue Type: Bug Components: core Reporter: Jun Rao Assignee: Ewen Cheslack-Postava Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1698.patch We should use it to validate the actual configured value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1711) WARN Property topic is not valid when running console producer
[ https://issues.apache.org/jira/browse/KAFKA-1711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Crobak updated KAFKA-1711: -- Attachment: KAFKA-1711.patch Here's a patch that fixes the warning. WARN Property topic is not valid when running console producer -- Key: KAFKA-1711 URL: https://issues.apache.org/jira/browse/KAFKA-1711 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1711.patch bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2014-10-17 09:54:23,984] WARN Property topic is not valid (kafka.utils.VerifiableProperties) It would be good if we can get rid of the warning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Java New Producer] Snappy NPE Issue
I took a quick look at this since I noticed the same issue when testing your code for the issues you filed. I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. -Ewen On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote: Hi Kafka Dev, I am getting following issue with Snappy Library. I checked code for Snappy lib it seems to be fine. Have you guys seen this issue ? 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Here is code for Snappy http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 : 153 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 *if* (inputBuffer http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer == *null* || (buffer != *null* buffer.length inputBuffer http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer.length http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer)) { Thanks, Bhavesh
[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector
[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14177822#comment-14177822 ] Sean Fay commented on KAFKA-1716: - This happens infrequently at system shutdown. I don't have steps to reliably reproduce it. I can't attach the full thread dump, but I can answer questions about it. There are two other threads that have the exact same stack trace as the ConsumerFetcherThread above. There are no other threads that have kafka classes on the stack. hang during shutdown of ZookeeperConsumerConnector -- Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code}-- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code}-- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/utils/Utils$.inLock(Utils.scala:538) at
[jira] [Created] (KAFKA-1721) Snappy compressor is not thread safe
Ewen Cheslack-Postava created KAFKA-1721: Summary: Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Java New Producer] Snappy NPE Issue
Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this since it either requires an updated version of the upstream library, a workaround by us, or at a bare minimum clear documentation of the issue. On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote: I took a quick look at this since I noticed the same issue when testing your code for the issues you filed. I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. -Ewen On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote: Hi Kafka Dev, I am getting following issue with Snappy Library. I checked code for Snappy lib it seems to be fine. Have you guys seen this issue ? 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Here is code for Snappy http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 : 153 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 *if* (inputBuffer http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer == *null* || (buffer != *null* buffer.length inputBuffer http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer.length http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer)) { Thanks, Bhavesh
[jira] [Updated] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1642: - Reviewer: Jay Kreps [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1711) WARN Property topic is not valid when running console producer
[ https://issues.apache.org/jira/browse/KAFKA-1711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1711: - Reviewer: Jun Rao WARN Property topic is not valid when running console producer -- Key: KAFKA-1711 URL: https://issues.apache.org/jira/browse/KAFKA-1711 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1711.patch bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2014-10-17 09:54:23,984] WARN Property topic is not valid (kafka.utils.VerifiableProperties) It would be good if we can get rid of the warning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Sending Same Message to Two Topics on Same Broker Cluster
Hi Kakfa Team, I would like to send a single message to multiple topics (two for now) without re-transmitting the message from producer to brokers. Is this possible? Both Producers Scala and Java does not allow this. I do not have to do this all the time only based on application condition. Thanks in advance of your help !! Thanks, Bhavesh
[jira] [Updated] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation
[ https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1634: --- Fix Version/s: (was: 0.8.2) 0.8.3 Moving this to 0.8.3 since it's easier to fix after KAFKA-1583 is done. Improve semantics of timestamp in OffsetCommitRequests and update documentation --- Key: KAFKA-1634 URL: https://issues.apache.org/jira/browse/KAFKA-1634 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Guozhang Wang Priority: Blocker Fix For: 0.8.3 From the mailing list - following up on this -- I think the online API docs for OffsetCommitRequest still incorrectly refer to client-side timestamps: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Wasn't that removed and now always handled server-side now? Would one of the devs mind updating the API spec wiki? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Sending Same Message to Two Topics on Same Broker Cluster
Not really. You need producers to send data to Kafka. On Mon, Oct 20, 2014 at 9:05 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kakfa Team, I would like to send a single message to multiple topics (two for now) without re-transmitting the message from producer to brokers. Is this possible? Both Producers Scala and Java does not allow this. I do not have to do this all the time only based on application condition. Thanks in advance of your help !! Thanks, Bhavesh