Re: Review Request 26373: Patch for KAFKA-1647
On Oct. 21, 2014, 12:18 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/ReplicaManager.scala, lines 481-506 https://reviews.apache.org/r/26373/diff/3/?file=725140#file725140line481 This doesn't quite fix the original problem though. The original problem is that if the leader is not alive, we won't call partition.makeFollower(), in which the local replica is created. If a local replica is not created, the partition will be ignored when checkingpoint HW and we lose the last checkpointed HW. So, we have to call partition.makerFollower() for every follower, whether its leader is live or not. After this, we can proceed with the rest of the steps for only those partitions with a live leader. We can log a warning for those partitions w/o a live leader. Hi Jun, thanks for the review. I think the high watermark is actually read in partition.getOrCreateReplica(). As you said, that means in order to preserve the high watermark, the local replica is supposed to be created. The original code did not create local replica when remote leader is not up so it lost the high watermark. partition.getOrCreateReplica() is actually called in the following 2 places: 1. partition.makeFollower() 2. ReplicaManager line 515, when we truncate the log for the partitions in partitionsToMakeFollower. Both of them could preserve high watermark. The difference between them is that in (1) all the replica for the partition, including the not-existing-yet remote replica, will be created. However in (2) only local replica will be created. Because I'm not 100% sure if it could cause some other issue if we create a remote replica when it is actually not up yet, so I chose to preserve the high watermark in (2) instead of in (1), which has less change compared with original code. - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/#review57490 --- On Oct. 18, 2014, 7:26 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/ --- (Updated Oct. 18, 2014, 7:26 a.m.) Review request for kafka. Bugs: KAFKA-1647 https://issues.apache.org/jira/browse/KAFKA-1647 Repository: kafka Description --- Addressed Joel's comments. the version 2 code seems to be submitted by mistake... This should be the code for review that addressed Joel's comments. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 Diff: https://reviews.apache.org/r/26373/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vladimir Tretyakov updated KAFKA-1481: -- Attachment: KAFKA-1481_2014-10-21_09-14-35.patch Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178039#comment-14178039 ] Vladimir Tretyakov commented on KAFKA-1481: --- Hi, added last patch (KAFKA-1481_2014-10-21_09-14-35.patch). Worked for me locally (previous worked too:( ). There are commands I've used: {code} PATCH CREATION wawanawna@wawanawna:/home/storage/sematext/src/kfk2/kafka$ git status On branch 0.8.2 Your branch is up-to-date with 'origin/0.8.2'. Changes to be committed: (use git reset HEAD file... to unstage) new file: core/src/main/scala/kafka/common/BrokerInfo.scala new file: core/src/main/scala/kafka/common/ClientIdTopic.scala new file: core/src/main/scala/kafka/common/Taggable.scala renamed:core/src/main/scala/kafka/common/ClientIdAndTopic.scala - core/src/main/scala/kafka/common/TopicInfo.scala new file: core/src/main/scala/kafka/consumer/ConsumerFetcherThreadId.scala new file: core/src/main/scala/kafka/consumer/ConsumerId.scala Changes not staged for commit: (use git add file... to update what will be committed) (use git checkout -- file... to discard changes in working directory) modified: core/src/main/scala/kafka/admin/AdminUtils.scala modified: core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala modified: core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala modified: core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala modified: core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala modified: core/src/main/scala/kafka/api/RequestKeys.scala modified: core/src/main/scala/kafka/cluster/Partition.scala ... ... ... wawanawna@wawanawna:/home/storage/sematext/src/kfk2/kafka$ git commit -a -m 'kafka-1481; JMX renaming' [0.8.2 a232af6] kafka-1481; JMX renaming 73 files changed, 666 insertions(+), 435 deletions(-) create mode 100644 core/src/main/scala/kafka/common/BrokerInfo.scala create mode 100644 core/src/main/scala/kafka/common/ClientIdTopic.scala create mode 100644 core/src/main/scala/kafka/common/Taggable.scala rename core/src/main/scala/kafka/common/{ClientIdAndTopic.scala = TopicInfo.scala} (68%) create mode 100644 core/src/main/scala/kafka/consumer/ConsumerFetcherThreadId.scala create mode 100644 core/src/main/scala/kafka/consumer/ConsumerId.scala wawanawna@wawanawna:/home/storage/sematext/src/kfk2/kafka$ git log | head -n 20 commit a232af63b8a1f43054994a74520b31ef7b9b347c Author: wawanawna sema...@mail.com Date: Tue Oct 21 09:29:31 2014 +0300 kafka-1481; JMX renaming commit eb7ac9eb7eebc4e0655b65e07cae594e61a6c05e Author: Jun Rao jun...@gmail.com Date: Mon Oct 20 11:09:31 2014 -0700 kafka-1717; remove netty dependency through ZK 3.4.x; patched by Jun Rao; reviewed by Sriharsha Chintalapani and Neha Narkhede wawanawna@wawanawna:/home/storage/sematext/src/kfk2/kafka$ git diff eb7ac9eb7eebc4e0655b65e07cae594e61a6c05e a232af63b8a1f43054994a74520b31ef7b9b347c /tmp/KAFKA-1481_2014-10-21_09-14-35.patch PATCH APPLYING wawanawna@wawanawna$ git clone https://github.com/apache/kafka.git wawanawna@wawanawna:/home/storage/sematext/src/kfk3/kafka$ git checkout -b 0.8.2 origin/0.8.2 Branch 0.8.2 set up to track remote branch 0.8.2 from origin. Switched to a new branch '0.8.2' wawanawna@wawanawna:/home/storage/sematext/src/kfk3/kafka$ git apply /tmp/KAFKA-1481_2014-10-21_09-14-35.patch /tmp/KAFKA-1481_2014-10-21_09-14-35.patch:1410: trailing whitespace. }) warning: 1 line adds whitespace errors. {code} There is warning, but everything works (compilation/tests passed without errors) PS: please tell me what I am doing wrong. wawanawna@wawanawna:/home/storage/sematext/src/kfk3/kafka$ git --version git version 1.9.1 Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.2 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc.,
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178396#comment-14178396 ] Wojciech Kuranowski commented on KAFKA-1718: I have compiled kafka 0.8.2 with different messages for validation and revalidation, and it seems that this issue is triggered in revalidation after recompression. In my case: kafka.common.MessageSizeTooLargeException: revalidate - Message size is 3382345 bytes which exceeds the maximum configured message size of 100. It's strange that message after recompression is 3 times bigger than the limit. Is broker miscalculating something? Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
KAFKA-1634 is orthogonal to 1583 right? (BTW sorry about the delay in reviewing 1583 - I'm on it.) KAFKA-1634 shouldn't be too difficult to address so I'm hoping we can get that in 0.8.2 if not the beta release. Joel On Mon, Oct 20, 2014 at 09:14:17PM -0700, Jun Rao wrote: I move KAFKA-1634 to 0.8.3 since it's easier to fix after KAFKA-1583 is committed. With this, we have resolved all known blocker issues for 0.8.2. Thanks everyone. Joe, Do you want to help start rolling an 0.8.2 beta release? We can decide if we need to include any other fixes (e.g., KAFKA-1481) in the 0.8.2 final release. Thanks, Jun On Thu, Oct 16, 2014 at 3:53 PM, Joel Koshy jjkosh...@gmail.com wrote: +1 on the beta. I think KAFKA-1583 should only be on trunk, not 0.8.2 so that will only go out with 0.8.3. Joel On Thu, Oct 16, 2014 at 03:25:39PM -0700, Guozhang Wang wrote: Agree. On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote: +1 on doing an 0.8.2 beta. Guozhang, kafka-1583 is relatively large. Given that we are getting close to releasing 0.8.2 beta, my feeling is that we probably shouldn't include it in 0.8.2 beta even if we can commit it in a few days. Thanks, Jun On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com wrote: Regarding 1634, I was intended to work on that after 1583 since it will changes the commit offset request handling logic a lot. If people think 1583 is only a few days away before check-in, we can leave in in 0.8.2-beta; otherwise we can push to 0.8.3. Guozhang On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote: +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later. I agree to the tickets you brought up to have in 0.8.2-beta and also https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha -- -- Guozhang
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178519#comment-14178519 ] Sriharsha Chintalapani commented on KAFKA-1718: --- I ran sizeInBytes for test.getBytes test.getBytes size 4 test message(Message.scala) size 18 test ByteBufferedMessageSet size 30 Per each message there is additional data being added. * 1. 4 byte CRC32 of the message * 2. 1 byte magic identifier to allow format changes, value is 2 currently * 3. 1 byte attributes identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) * 4. 4 byte key length, containing length K * 5. K byte key * 6. 4 byte payload length, containing length V * 7. V byte payload for the message test with key being null the size comes to 18 and if you add this message to ByteBufferMessageSet it will be 30 (12 being the LogOverHead) Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1721: - Component/s: compression Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1678) add new options for reassign partition to better manager dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-1678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1678: - Hey [~gwenshap], I have a freed up engineer this week and next to work on this ticket is it ok if I assign it over to him? add new options for reassign partition to better manager dead brokers - Key: KAFKA-1678 URL: https://issues.apache.org/jira/browse/KAFKA-1678 Project: Kafka Issue Type: Bug Reporter: Joe Stein Assignee: Gwen Shapira Labels: operations Fix For: 0.8.3 this is in two forms --replace-replica which is from broker.id to broker.id and --remove-replica which is just a single broker.id -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
Can Kafka-1583 go into 0.8.3 on trunk once ready? It is marked now for 0.9.0. I am about to move the JIRAs open and patch resolved to 0.8.3 from 0.8.2. On Tue, Oct 21, 2014 at 11:00 AM, Joel Koshy jjkosh...@gmail.com wrote: KAFKA-1634 is orthogonal to 1583 right? (BTW sorry about the delay in reviewing 1583 - I'm on it.) KAFKA-1634 shouldn't be too difficult to address so I'm hoping we can get that in 0.8.2 if not the beta release. Joel On Mon, Oct 20, 2014 at 09:14:17PM -0700, Jun Rao wrote: I move KAFKA-1634 to 0.8.3 since it's easier to fix after KAFKA-1583 is committed. With this, we have resolved all known blocker issues for 0.8.2. Thanks everyone. Joe, Do you want to help start rolling an 0.8.2 beta release? We can decide if we need to include any other fixes (e.g., KAFKA-1481) in the 0.8.2 final release. Thanks, Jun On Thu, Oct 16, 2014 at 3:53 PM, Joel Koshy jjkosh...@gmail.com wrote: +1 on the beta. I think KAFKA-1583 should only be on trunk, not 0.8.2 so that will only go out with 0.8.3. Joel On Thu, Oct 16, 2014 at 03:25:39PM -0700, Guozhang Wang wrote: Agree. On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote: +1 on doing an 0.8.2 beta. Guozhang, kafka-1583 is relatively large. Given that we are getting close to releasing 0.8.2 beta, my feeling is that we probably shouldn't include it in 0.8.2 beta even if we can commit it in a few days. Thanks, Jun On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com wrote: Regarding 1634, I was intended to work on that after 1583 since it will changes the commit offset request handling logic a lot. If people think 1583 is only a few days away before check-in, we can leave in in 0.8.2-beta; otherwise we can push to 0.8.3. Guozhang On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote: +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later. I agree to the tickets you brought up to have in 0.8.2-beta and also https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha -- -- Guozhang
[jira] [Updated] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)
[ https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1566: - Fix Version/s: (was: 0.8.2) (was: 0.9.0) 0.8.3 Kafka environment configuration (kafka-env.sh) -- Key: KAFKA-1566 URL: https://issues.apache.org/jira/browse/KAFKA-1566 Project: Kafka Issue Type: Improvement Components: tools Reporter: Cosmin Lehene Assignee: Cosmin Lehene Labels: newbie Fix For: 0.8.3 It would be useful (especially for automated deployments) to have an environment configuration file that could be sourced from the launcher files (e.g. kafka-run-server.sh). This is how this could look like kafka-env.sh {code} export KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseCompressedOops -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35' % export KAFKA_HEAP_OPTS='-Xmx1G -Xms1G' % export KAFKA_LOG4J_OPTS=-Dkafka.logs.dir=/var/log/kafka {code} kafka-server-start.sh {code} ... source $base_dir/config/kafka-env.sh ... {code} This approach is consistent with Hadoop and HBase. However the idea here is to be able to set these values in a single place without having to edit startup scripts. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1700) examples directory - README and shell scripts are out of date
[ https://issues.apache.org/jira/browse/KAFKA-1700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1700: - Fix Version/s: (was: 0.8.2) 0.8.3 examples directory - README and shell scripts are out of date - Key: KAFKA-1700 URL: https://issues.apache.org/jira/browse/KAFKA-1700 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1 Reporter: Geoffrey Anderson Priority: Minor Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1700.patch sbt build files were removed during resolution of KAFKA-1254, so the README under the examples directory should no longer make reference to sbt. Also, the paths added to CLASSPATH variable in the example shell script are no longer correct. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-686) 0.8 Kafka broker should give a better error message when running against 0.7 zookeeper
[ https://issues.apache.org/jira/browse/KAFKA-686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-686: Fix Version/s: (was: 0.8.2) 0.8.3 0.8 Kafka broker should give a better error message when running against 0.7 zookeeper -- Key: KAFKA-686 URL: https://issues.apache.org/jira/browse/KAFKA-686 Project: Kafka Issue Type: Bug Affects Versions: 0.8.0 Reporter: Jay Kreps Labels: newbie, patch Fix For: 0.8.3 Attachments: KAFAK-686-null-pointer-fix.patch, KAFKA-686-null-pointer-fix-2.patch People will not know that the zookeeper paths are not compatible. When you try to start the 0.8 broker pointed at a 0.7 zookeeper you get a NullPointerException. We should detect this and give a more sane error. Error: kafka.common.KafkaException: Can't parse json string: null at kafka.utils.Json$.liftedTree1$1(Json.scala:20) at kafka.utils.Json$.parseFull(Json.scala:16) at kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$2.apply(ZkUtils.scala:498) at kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$2.apply(ZkUtils.scala:494) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.utils.ZkUtils$.getReplicaAssignmentForTopics(ZkUtils.scala:494) at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:446) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:220) at kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:85) at kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:53) at kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:43) at kafka.controller.KafkaController.startup(KafkaController.scala:381) at kafka.server.KafkaServer.startup(KafkaServer.scala:90) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) Caused by: java.lang.NullPointerException at scala.util.parsing.combinator.lexical.Scanners$Scanner.init(Scanners.scala:52) at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:71) at scala.util.parsing.json.JSON$.parseFull(JSON.scala:85) at kafka.utils.Json$.liftedTree1$1(Json.scala:17) ... 16 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-438) Code cleanup in MessageTest
[ https://issues.apache.org/jira/browse/KAFKA-438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-438: Fix Version/s: (was: 0.8.2) 0.8.3 Code cleanup in MessageTest --- Key: KAFKA-438 URL: https://issues.apache.org/jira/browse/KAFKA-438 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7.1 Reporter: Jim Plush Priority: Trivial Fix For: 0.8.3 Attachments: KAFKA-438 While exploring the Unit Tests this class had an unused import statement, some ambiguity on which HashMap implementation was being used and assignments of function returns when not required. Trivial stuff -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1351) String.format is very expensive in Scala
[ https://issues.apache.org/jira/browse/KAFKA-1351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1351: - Fix Version/s: (was: 0.8.2) 0.8.3 String.format is very expensive in Scala Key: KAFKA-1351 URL: https://issues.apache.org/jira/browse/KAFKA-1351 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7.2, 0.8.0, 0.8.1 Reporter: Neha Narkhede Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1351.patch, KAFKA-1351_2014-04-07_18:02:18.patch, KAFKA-1351_2014-04-09_15:40:11.patch As found in KAFKA-1350, logging is causing significant overhead in the performance of a Kafka server. There are several info statements that use String.format which is particularly expensive. We should investigate adding our own version of String.format that merely uses string concatenation under the covers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1420) Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in unit tests
[ https://issues.apache.org/jira/browse/KAFKA-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1420: - Fix Version/s: (was: 0.8.2) 0.8.3 Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in unit tests -- Key: KAFKA-1420 URL: https://issues.apache.org/jira/browse/KAFKA-1420 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Jonathan Natkins Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1420.patch, KAFKA-1420_2014-07-30_11:18:26.patch, KAFKA-1420_2014-07-30_11:24:55.patch, KAFKA-1420_2014-08-02_11:04:15.patch, KAFKA-1420_2014-08-10_14:12:05.patch, KAFKA-1420_2014-08-10_23:03:46.patch This is a follow-up JIRA from KAFKA-1389. There are a bunch of places in the unit tests where we misuse AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK to create topics, where TestUtils.createTopic needs to be used instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1561) Data Loss for Incremented Replica Factor and Leader Election
[ https://issues.apache.org/jira/browse/KAFKA-1561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1561: - Fix Version/s: (was: 0.8.2) 0.8.3 Data Loss for Incremented Replica Factor and Leader Election Key: KAFKA-1561 URL: https://issues.apache.org/jira/browse/KAFKA-1561 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8.3 Attachments: broker0.log, broker2.log, consumer.log, producer.log This is reported on the mailing list (thanks to Jad). {quote} Hi, I have a test that continuously sends messages to one broker, brings up another broker, and adds it as a replica for all partitions, with it being the preferred replica for some. I have auto.leader.rebalance.enable=true, so replica election gets triggered. Data is being pumped to the old broker all the while. It seems that some data gets lost while switching over to the new leader. Is this a bug, or do I have something misconfigured? I also have request.required.acks=-1 on the producer. Here's what I think is happening: 1. Producer writes message to broker 0, [EventServiceUpsertTopic,13], w/ broker 0 currently leader, with ISR=(0), so write returns successfully, even when acks = -1. Correlation id 35836 Producer log: [2014-07-24 14:44:26,991] [DEBUG] [dw-97 - PATCH /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField_mergeFields=field1] [kafka.producer.BrokerPartitionInfo] Partition [EventServiceUpsertTopic,13] has leader 0 [2014-07-24 14:44:26,993] [DEBUG] [dw-97 - PATCH /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField_mergeFields=field1] [k.producer.async.DefaultEventHandler] Producer sent messages with correlation id 35836 for topics [EventServiceUpsertTopic,13] to broker 0 on localhost:56821 2. Broker 1 is still catching up Broker 0 Log: [2014-07-24 14:44:26,992] [DEBUG] [kafka-request-handler-3] [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] on broker 0: Old hw for partition [EventServiceUpsertTopic,13] is 971. New hw is 971. All leo's are 975,971 [2014-07-24 14:44:26,992] [DEBUG] [kafka-request-handler-3] [kafka.server.KafkaApis] [KafkaApi-0] Produce to local log in 0 ms [2014-07-24 14:44:26,992] [DEBUG] [kafka-processor-56821-0] [kafka.request.logger] Completed request:Name: ProducerRequest; Version: 0; CorrelationId: 35836; ClientId: ; RequiredAcks: -1; AckTimeoutMs: 1 ms from client /127.0.0.1:57086 ;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0 3. Leader election is triggered by the scheduler: Broker 0 Log: [2014-07-24 14:44:26,991] [INFO ] [kafka-scheduler-0] [k.c.PreferredReplicaPartitionLeaderSelector] [PreferredReplicaPartitionLeaderSelector]: Current leader 0 for partition [ EventServiceUpsertTopic,13] is not the preferred replica. Trigerring preferred replica leader election [2014-07-24 14:44:26,993] [DEBUG] [kafka-scheduler-0] [kafka.utils.ZkUtils$] Conditional update of path /brokers/topics/EventServiceUpsertTopic/partitions/13/state with value {controller_epoch:1,leader:1,version:1,leader_epoch:3,isr:[0,1]} and expected version 3 succeeded, returning the new version: 4 [2014-07-24 14:44:26,994] [DEBUG] [kafka-scheduler-0] [k.controller.PartitionStateMachine] [Partition state machine on Controller 0]: After leader election, leader cache is updated to Map(Snipped(Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),EndSnip) [2014-07-24 14:44:26,994] [INFO ] [kafka-scheduler-0] [kafka.controller.KafkaController] [Controller 0]: Partition [ EventServiceUpsertTopic,13] completed preferred replica leader election. New leader is 1 4. Broker 1 is still behind, but it sets the high water mark to 971!!! Broker 1 Log: [2014-07-24 14:44:26,999] [INFO ] [kafka-request-handler-6] [kafka.server.ReplicaFetcherManager] [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [EventServiceUpsertTopic,13] [2014-07-24 14:44:27,000] [DEBUG] [kafka-request-handler-6] [kafka.cluster.Partition] Partition [EventServiceUpsertTopic,13] on broker 1: Old hw for partition [EventServiceUpsertTopic,13] is 970. New hw is -1. All leo's are -1,971 [2014-07-24 14:44:27,098] [DEBUG] [kafka-request-handler-3] [kafka.server.KafkaApis] [KafkaApi-1] Maybe update partition HW due to fetch request: Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId: ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [EventServiceUpsertTopic,13] - PartitionFetchInfo(971,1048576), Snipped [2014-07-24 14:44:27,098] [DEBUG] [kafka-request-handler-3]
[jira] [Updated] (KAFKA-1374) LogCleaner (compaction) does not support compressed topics
[ https://issues.apache.org/jira/browse/KAFKA-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1374: - Fix Version/s: (was: 0.8.2) 0.8.3 LogCleaner (compaction) does not support compressed topics -- Key: KAFKA-1374 URL: https://issues.apache.org/jira/browse/KAFKA-1374 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1374.patch, KAFKA-1374_2014-08-09_16:18:55.patch, KAFKA-1374_2014-08-12_22:23:06.patch, KAFKA-1374_2014-09-23_21:47:12.patch, KAFKA-1374_2014-10-03_18:49:16.patch, KAFKA-1374_2014-10-03_19:17:17.patch This is a known issue, but opening a ticket to track. If you try to compact a topic that has compressed messages you will run into various exceptions - typically because during iteration we advance the position based on the decompressed size of the message. I have a bunch of stack traces, but it should be straightforward to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-795) Improvements to PreferredReplicaLeaderElection tool
[ https://issues.apache.org/jira/browse/KAFKA-795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-795: Fix Version/s: (was: 0.8.2) 0.8.3 Improvements to PreferredReplicaLeaderElection tool --- Key: KAFKA-795 URL: https://issues.apache.org/jira/browse/KAFKA-795 Project: Kafka Issue Type: Bug Affects Versions: 0.8.0 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Fix For: 0.8.3 We can make some improvements to the PreferredReplicaLeaderElection tool: 1. Terminate the tool if a controller is not up and running. Currently we can run the tool without having any broker running, which is kind of confusing. 2. Should we delete /admin zookeeper path in PreferredReplicaLeaderElection (and ReassignPartition) tool at the end? Otherwise the next run of the tool complains that a replica election is already in progress. 3. If there is an error, we can see it in cotroller.log. Should the tool also throw an error? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.
[ https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1660: - Fix Version/s: (was: 0.8.2) 0.8.3 Ability to call close() with a timeout on the Java Kafka Producer. --- Key: KAFKA-1660 URL: https://issues.apache.org/jira/browse/KAFKA-1660 Project: Kafka Issue Type: Improvement Components: clients, producer Affects Versions: 0.8.2 Reporter: Andrew Stein Assignee: Jun Rao Fix For: 0.8.3 I would like the ability to call {{close}} with a timeout on the Java Client's KafkaProducer. h6. Workaround Currently, it is possible to ensure that {{close}} will return quickly by first doing a {{future.get(timeout)}} on the last future produced on each partition, but this means that the user has to define the partitions up front at the time of {{send}} and track the returned {{future}}'s -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1659) Ability to cleanly abort the KafkaProducer
[ https://issues.apache.org/jira/browse/KAFKA-1659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1659: - Fix Version/s: (was: 0.8.2) 0.8.3 Ability to cleanly abort the KafkaProducer -- Key: KAFKA-1659 URL: https://issues.apache.org/jira/browse/KAFKA-1659 Project: Kafka Issue Type: Improvement Components: clients, producer Affects Versions: 0.8.2 Reporter: Andrew Stein Assignee: Jun Rao Fix For: 0.8.3 I would like the ability to abort the Java Client's KafkaProducer. This includes the stopping the writing of buffered records. The motivation for this is described [here|http://mail-archives.apache.org/mod_mbox/kafka-dev/201409.mbox/%3CCAOk4UxB7BJm6HSgLXrR01sksB2dOC3zdt0NHaKHz1EALR6%3DCTQ%40mail.gmail.com%3E]. A sketch of this method is: {code} public void abort() { try { ioThread.interrupt(); ioThread.stop(new ThreadDeath()); } catch (IllegalAccessException e) { } } {code} but of course it is preferable to stop the {{ioThread}} by cooperation, rather than use the deprecated {{Thread.stop(new ThreadDeath())}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1499) Broker-side compression configuration
[ https://issues.apache.org/jira/browse/KAFKA-1499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1499: - Fix Version/s: (was: 0.8.2) 0.8.3 Broker-side compression configuration - Key: KAFKA-1499 URL: https://issues.apache.org/jira/browse/KAFKA-1499 Project: Kafka Issue Type: New Feature Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1499.patch, KAFKA-1499.patch, KAFKA-1499_2014-08-15_14:20:27.patch, KAFKA-1499_2014-08-21_21:44:27.patch, KAFKA-1499_2014-09-21_15:57:23.patch, KAFKA-1499_2014-09-23_14:45:38.patch, KAFKA-1499_2014-09-24_14:20:33.patch, KAFKA-1499_2014-09-24_14:24:54.patch, KAFKA-1499_2014-09-25_11:05:57.patch Original Estimate: 72h Remaining Estimate: 72h A given topic can have messages in mixed compression codecs. i.e., it can also have a mix of uncompressed/compressed messages. It will be useful to support a broker-side configuration to recompress messages to a specific compression codec. i.e., all messages (for all topics) on the broker will be compressed to this codec. We could have per-topic overrides as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-934) kafka hadoop consumer and producer use older 0.19.2 hadoop api's
[ https://issues.apache.org/jira/browse/KAFKA-934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-934: Fix Version/s: (was: 0.8.2) 0.8.3 kafka hadoop consumer and producer use older 0.19.2 hadoop api's Key: KAFKA-934 URL: https://issues.apache.org/jira/browse/KAFKA-934 Project: Kafka Issue Type: Bug Components: contrib Affects Versions: 0.8.0 Environment: [amilkowski@localhost impl]$ uname -a Linux localhost.localdomain 3.9.4-200.fc18.x86_64 #1 SMP Fri May 24 20:10:49 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux Reporter: Andrew Milkowski Assignee: Sriharsha Chintalapani Labels: hadoop, hadoop-2.0, newbie Fix For: 0.8.3 New hadoop api present in 0.20.1 especially package org.apache.hadoop.mapredude.lib is not used code affected is both consumer and producer in kafka in the contrib package [amilkowski@localhost contrib]$ pwd /opt/local/git/kafka/contrib [amilkowski@localhost contrib]$ ls -lt total 12 drwxrwxr-x 8 amilkowski amilkowski 4096 May 30 11:14 hadoop-consumer drwxrwxr-x 6 amilkowski amilkowski 4096 May 29 19:31 hadoop-producer drwxrwxr-x 6 amilkowski amilkowski 4096 May 29 16:43 target [amilkowski@localhost contrib]$ in example import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TextOutputFormat; use 0.19.2 hadoop api format, this prevents merging of hadoop feature into more modern hadoop implementation instead of drawing from 0.20.1 api set in import org.apache.hadoop.mapreduce -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-313) Add JSON output and looping options to ConsumerOffsetChecker
[ https://issues.apache.org/jira/browse/KAFKA-313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-313: Fix Version/s: (was: 0.8.2) 0.8.3 Add JSON output and looping options to ConsumerOffsetChecker Key: KAFKA-313 URL: https://issues.apache.org/jira/browse/KAFKA-313 Project: Kafka Issue Type: Improvement Reporter: Dave DeMaagd Priority: Minor Labels: newbie, patch Fix For: 0.8.3 Attachments: KAFKA-313-2012032200.diff Adds: * '--loop N' - causes the program to loop forever, sleeping for up to N seconds between loops (loop time minus collection time, unless that's less than 0, at which point it will just run again immediately) * '--asjson' - display as a JSON string instead of the more human readable output format. Neither of the above depend on each other (you can loop in the human readable output, or do a single shot execution with JSON output). Existing behavior/output maintained if neither of the above are used. Diff Attached. Impacted files: core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1230) shell script files under bin don't work with cygwin (bash on windows)
[ https://issues.apache.org/jira/browse/KAFKA-1230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1230: - Fix Version/s: (was: 0.8.2) 0.8.3 shell script files under bin don't work with cygwin (bash on windows) - Key: KAFKA-1230 URL: https://issues.apache.org/jira/browse/KAFKA-1230 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.0 Environment: The change have been tested under GNU bash, version 4.1.11(2)-release (x86_64-unknown-cygwin) running on Windows 7 Enterprise. Reporter: Alok Lal Fix For: 0.8.3 Attachments: 0001-Added-changes-so-that-bin-.sh-files-can-work-with-CY.patch Original Estimate: 24h Remaining Estimate: 24h h3. Introduction This bug is being created for a pull request that I had submitted earlier for these. Per Jun this is so changes confirm to Apache license. h3. Background The script files to run Kafka under Windows don't work as is. One needs to hand tweak them since their location is not bin but bin/windows. Further, the script files under bin/windows are not a complete replica of those under bin. To be sure, this isn't a complaint. To the contrary most projects now-a-days don't bother to support running on Windows or do so very late. Just that because of these limitation it might be more prudent to make the script files under bin itself run under windows rather than trying to make the files under bin/windows work or to make them complete. h3. Change Summary Most common unix-like shell on windows is the bash shell which is a part of the cygwin project. Out of the box the scripts don't work mostly due to peculiarities of the directory paths and class path separators. This change set makes a focused change to a single file under bin so that all of the script files under bin would work as is on windows platform when using bash shell of Cygwin distribution. h3. Motivation Acceptance of this change would enable a vast body of developers that use (or have to use) Windows as their development/testing/production platform to use Kafka's with ease. More importantly by making the running of examples smoothly on Windoes+Cygwin-bash it would make the process of evaluation of Kafka simpler and smoother and potentially make for a favorable evaluation. For, it would show commitment of the Kafka team to espouse deployments on Windows (albeit only under cygwin). Further, as the number of people whom use Kafka on Windows increases, one would attract people who can eventually fix the script files under bin/Windows itself so that need to run under Cygwin would also go away, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1675) bootstrapping tidy-up
[ https://issues.apache.org/jira/browse/KAFKA-1675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1675: - Fix Version/s: (was: 0.8.2) 0.8.3 bootstrapping tidy-up - Key: KAFKA-1675 URL: https://issues.apache.org/jira/browse/KAFKA-1675 Project: Kafka Issue Type: Bug Reporter: Szczepan Faber Assignee: Ivan Lyutov Fix For: 0.8.3 Attachments: KAFKA-1675.patch I'd like to suggest following changes: 1. remove the 'gradlew' and 'gradlew.bat' scripts from the source tree. Those scripts don't work, e.g. they fail with exception when invoked. I just got a user report where those scripts were invoked by the user and it led to an exception that was not easy to grasp. Bootstrapping step will generate those files anyway. 2. move the 'gradleVersion' extra property from the 'build.gradle' into 'gradle.properties'. Otherwise it is hard to automate the bootstrapping process - in order to find out the gradle version, I need to evaluate the build script, and for that I need gradle with correct version (kind of a vicious circle). Project properties declared in the gradle.properties file can be accessed exactly the same as the 'ext' properties, for example: 'project.gradleVersion'. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1651) Removed some extra whitespace in KafkaServer.scala
[ https://issues.apache.org/jira/browse/KAFKA-1651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1651: - Fix Version/s: (was: 0.8.2) 0.8.3 Removed some extra whitespace in KafkaServer.scala -- Key: KAFKA-1651 URL: https://issues.apache.org/jira/browse/KAFKA-1651 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jonathan Creasy Priority: Trivial Labels: newbie Fix For: 0.8.3 Attachments: KAFKA-1651.patch, KAFKA-1651_2014-09-25_00:49:36.patch, KAFKA-1651_2014-09-25_00:50:11.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1481: - Fix Version/s: (was: 0.8.2) 0.8.3 Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Java New Producer] Snappy NPE Issue
Hi Ewen, It seems Leo has fixed the snappy lib for this issue. Here are changes: https://github.com/xerial/snappy-java/commit/7b86642f75c280debf3c1983053ea7f8635b48a5 Here is Jar with fix: https://oss.sonatype.org/content/repositories/snapshots/org/xerial/snappy/snappy-java/1.1.1.4-SNAPSHOT/ I will try this today afternoon. If it works, would you be able to upgrade Kafka trunk with this version. Thanks, Bhavesh On Mon, Oct 20, 2014 at 9:53 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Ewen, Thanks for doing the deep analysis on this issue. I have file this issue with Snappy project and linked this Kafka Issues. Here is details about the git hub issue: https://github.com/xerial/snappy-java/issues/88 I will follow-up with snappy guys to figure out how to solve this problem. For us, this is typical use case of running web app J2EE container with thread pool and recycled threads. Thanks, Bhavesh On Mon, Oct 20, 2014 at 6:56 PM, Ewen Cheslack-Postava m...@ewencp.org wrote: Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this since it either requires an updated version of the upstream library, a workaround by us, or at a bare minimum clear documentation of the issue. On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote: I took a quick look at this since I noticed the same issue when testing your code for the issues you filed. I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. -Ewen On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote: Hi Kafka Dev, I am getting following issue with Snappy Library. I checked code for Snappy lib it seems to be fine. Have you guys seen this issue ? 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Here is code for Snappy http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 : 153 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153 *if* (inputBuffer http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer == *null* || (buffer != *null* buffer.length inputBuffer http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer .length http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer )) { Thanks, Bhavesh
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
KAFKA-1583 will only be in trunk. It would be nice if we fix KAFKA-1634 in 0.8.2 too (may need a separate patch). If we can get a patch ready, we can include it in 0.8.2 final. Thanks, Jun On Tue, Oct 21, 2014 at 8:00 AM, Joel Koshy jjkosh...@gmail.com wrote: KAFKA-1634 is orthogonal to 1583 right? (BTW sorry about the delay in reviewing 1583 - I'm on it.) KAFKA-1634 shouldn't be too difficult to address so I'm hoping we can get that in 0.8.2 if not the beta release. Joel On Mon, Oct 20, 2014 at 09:14:17PM -0700, Jun Rao wrote: I move KAFKA-1634 to 0.8.3 since it's easier to fix after KAFKA-1583 is committed. With this, we have resolved all known blocker issues for 0.8.2. Thanks everyone. Joe, Do you want to help start rolling an 0.8.2 beta release? We can decide if we need to include any other fixes (e.g., KAFKA-1481) in the 0.8.2 final release. Thanks, Jun On Thu, Oct 16, 2014 at 3:53 PM, Joel Koshy jjkosh...@gmail.com wrote: +1 on the beta. I think KAFKA-1583 should only be on trunk, not 0.8.2 so that will only go out with 0.8.3. Joel On Thu, Oct 16, 2014 at 03:25:39PM -0700, Guozhang Wang wrote: Agree. On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote: +1 on doing an 0.8.2 beta. Guozhang, kafka-1583 is relatively large. Given that we are getting close to releasing 0.8.2 beta, my feeling is that we probably shouldn't include it in 0.8.2 beta even if we can commit it in a few days. Thanks, Jun On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com wrote: Regarding 1634, I was intended to work on that after 1583 since it will changes the commit offset request handling logic a lot. If people think 1583 is only a few days away before check-in, we can leave in in 0.8.2-beta; otherwise we can push to 0.8.3. Guozhang On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote: +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later. I agree to the tickets you brought up to have in 0.8.2-beta and also https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha -- -- Guozhang
Re: Review Request 26373: Patch for KAFKA-1647
On Oct. 21, 2014, 12:18 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/ReplicaManager.scala, lines 481-506 https://reviews.apache.org/r/26373/diff/3/?file=725140#file725140line481 This doesn't quite fix the original problem though. The original problem is that if the leader is not alive, we won't call partition.makeFollower(), in which the local replica is created. If a local replica is not created, the partition will be ignored when checkingpoint HW and we lose the last checkpointed HW. So, we have to call partition.makerFollower() for every follower, whether its leader is live or not. After this, we can proceed with the rest of the steps for only those partitions with a live leader. We can log a warning for those partitions w/o a live leader. Jiangjie Qin wrote: Hi Jun, thanks for the review. I think the high watermark is actually read in partition.getOrCreateReplica(). As you said, that means in order to preserve the high watermark, the local replica is supposed to be created. The original code did not create local replica when remote leader is not up so it lost the high watermark. partition.getOrCreateReplica() is actually called in the following 2 places: 1. partition.makeFollower() 2. ReplicaManager line 515, when we truncate the log for the partitions in partitionsToMakeFollower. Both of them could preserve high watermark. The difference between them is that in (1) all the replica for the partition, including the not-existing-yet remote replica, will be created. However in (2) only local replica will be created. Because I'm not 100% sure if it could cause some other issue if we create a remote replica when it is actually not up yet, so I chose to preserve the high watermark in (2) instead of in (1), which has less change compared with original code. Neha Narkhede wrote: Did you get a chance to repeat the kind of testing that was done to find this bug? I'd be more comfortable accepting this patch if we did that sort of testing since this change is tricky. Thanks for the explanation. Got it. On the other hand, if the leader is not ready, perhaps it's better not to truncate the data in the follower since this could cause some unnecessary data loss if the follower is later selected as the new leader. - Jun --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/#review57490 --- On Oct. 18, 2014, 7:26 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/ --- (Updated Oct. 18, 2014, 7:26 a.m.) Review request for kafka. Bugs: KAFKA-1647 https://issues.apache.org/jira/browse/KAFKA-1647 Repository: kafka Description --- Addressed Joel's comments. the version 2 code seems to be submitted by mistake... This should be the code for review that addressed Joel's comments. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 Diff: https://reviews.apache.org/r/26373/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 26666: Patch for KAFKA-1653
On Oct. 21, 2014, 4:44 p.m., Neha Narkhede wrote: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala, line 111 https://reviews.apache.org/r/2/diff/2/?file=723448#file723448line111 I ran a quick test on the following reassignment file and it didn't warn me about the duplicates in 1) the replica list and 2) the partition itself While running this test, I think it may also be worth de-duping the topic list. Actually ignore the first part of the comment above. My kafka jar was stale. It did warn me about the duplicate replica list, but not about the partition. It will be helpful to get an error if there are duplicate partitions as well as topics in the reassignment file. Realized this only while running this test example - Neha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/#review57602 --- On Oct. 16, 2014, 9:54 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/ --- (Updated Oct. 16, 2014, 9:54 p.m.) Review request for kafka. Bugs: KAFKA-1653 https://issues.apache.org/jira/browse/KAFKA-1653 Repository: kafka Description --- Generate error for duplicates in PreferredLeaderElectionCommand instead of just swallowing duplicates. Report which entries are duplicated for ReassignPartitionCommand since they may be difficult to find in large reassignments. Diffs - core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala c7918483c02040a7cc18d6e9edbd20a3025a3a55 core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/admin/TopicCommand.scala 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa core/src/main/scala/kafka/tools/StateChangeLogMerger.scala d298e7e81acc7427c6cf4796b445966267ca54eb Diff: https://reviews.apache.org/r/2/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Comment Edited] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178396#comment-14178396 ] Wojciech Kuranowski edited comment on KAFKA-1718 at 10/21/14 4:47 PM: -- I have compiled Kafka 0.8.2 with different error messages for validation and revalidation and it seems that this issue is triggered in revalidation after recompression. In my case: kafka.common.MessageSizeTooLargeException: revalidate - Message size is 3382345 bytes which exceeds the maximum configured message size of 100. It's strange that message after recompression is 3 times bigger than the limit. Is broker miscalculating something? was (Author: noxis): I have compiled kafka 0.8.2 with different messages for validation and revalidation, and it seems that this issue is triggered in revalidation after recompression. In my case: kafka.common.MessageSizeTooLargeException: revalidate - Message size is 3382345 bytes which exceeds the maximum configured message size of 100. It's strange that message after recompression is 3 times bigger than the limit. Is broker miscalculating something? Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by
[jira] [Commented] (KAFKA-1678) add new options for reassign partition to better manager dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-1678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178620#comment-14178620 ] Neha Narkhede commented on KAFKA-1678: -- There are a few more usability issues with the reassignment tool. For example, it allows the user to start reassignment for a topic that doesn't exist and also reassign to brokers that don't exist. I think it is important to fix some usability issues with this tool before adding more options to it. add new options for reassign partition to better manager dead brokers - Key: KAFKA-1678 URL: https://issues.apache.org/jira/browse/KAFKA-1678 Project: Kafka Issue Type: Bug Reporter: Joe Stein Assignee: Gwen Shapira Labels: operations Fix For: 0.8.3 this is in two forms --replace-replica which is from broker.id to broker.id and --remove-replica which is just a single broker.id -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1678) add new options for reassign partition to better manager dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-1678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178630#comment-14178630 ] Joe Stein commented on KAFKA-1678: -- Agreed, can this ticket be an update for those two items also? I figure the work can all be done in one patch? add new options for reassign partition to better manager dead brokers - Key: KAFKA-1678 URL: https://issues.apache.org/jira/browse/KAFKA-1678 Project: Kafka Issue Type: Bug Reporter: Joe Stein Assignee: Gwen Shapira Labels: operations Fix For: 0.8.3 this is in two forms --replace-replica which is from broker.id to broker.id and --remove-replica which is just a single broker.id -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1678) add new options for reassign partition to better manager dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-1678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178637#comment-14178637 ] Neha Narkhede commented on KAFKA-1678: -- Yes, I don't mind if it is part of the same patch. Just wanted to point out that we need to simplify this tool :) add new options for reassign partition to better manager dead brokers - Key: KAFKA-1678 URL: https://issues.apache.org/jira/browse/KAFKA-1678 Project: Kafka Issue Type: Bug Reporter: Joe Stein Assignee: Gwen Shapira Labels: operations Fix For: 0.8.3 this is in two forms --replace-replica which is from broker.id to broker.id and --remove-replica which is just a single broker.id -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1678) add new options for reassign partition to better manager dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-1678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178646#comment-14178646 ] Gwen Shapira commented on KAFKA-1678: - All yours, [~joestein]. add new options for reassign partition to better manager dead brokers - Key: KAFKA-1678 URL: https://issues.apache.org/jira/browse/KAFKA-1678 Project: Kafka Issue Type: Bug Reporter: Joe Stein Assignee: Gwen Shapira Labels: operations Fix For: 0.8.3 this is in two forms --replace-replica which is from broker.id to broker.id and --remove-replica which is just a single broker.id -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
Note that I added a patch with 0.8.2 documentation to KAFKA-1555 (in order to document the new min.isr behavior). We need to get this committed in SVN before the release, and probably document few more things (new producer for example). Gwen On Tue, Oct 21, 2014 at 12:07 PM, Jun Rao jun...@gmail.com wrote: KAFKA-1583 will only be in trunk. It would be nice if we fix KAFKA-1634 in 0.8.2 too (may need a separate patch). If we can get a patch ready, we can include it in 0.8.2 final. Thanks, Jun On Tue, Oct 21, 2014 at 8:00 AM, Joel Koshy jjkosh...@gmail.com wrote: KAFKA-1634 is orthogonal to 1583 right? (BTW sorry about the delay in reviewing 1583 - I'm on it.) KAFKA-1634 shouldn't be too difficult to address so I'm hoping we can get that in 0.8.2 if not the beta release. Joel On Mon, Oct 20, 2014 at 09:14:17PM -0700, Jun Rao wrote: I move KAFKA-1634 to 0.8.3 since it's easier to fix after KAFKA-1583 is committed. With this, we have resolved all known blocker issues for 0.8.2. Thanks everyone. Joe, Do you want to help start rolling an 0.8.2 beta release? We can decide if we need to include any other fixes (e.g., KAFKA-1481) in the 0.8.2 final release. Thanks, Jun On Thu, Oct 16, 2014 at 3:53 PM, Joel Koshy jjkosh...@gmail.com wrote: +1 on the beta. I think KAFKA-1583 should only be on trunk, not 0.8.2 so that will only go out with 0.8.3. Joel On Thu, Oct 16, 2014 at 03:25:39PM -0700, Guozhang Wang wrote: Agree. On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote: +1 on doing an 0.8.2 beta. Guozhang, kafka-1583 is relatively large. Given that we are getting close to releasing 0.8.2 beta, my feeling is that we probably shouldn't include it in 0.8.2 beta even if we can commit it in a few days. Thanks, Jun On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com wrote: Regarding 1634, I was intended to work on that after 1583 since it will changes the commit offset request handling logic a lot. If people think 1583 is only a few days away before check-in, we can leave in in 0.8.2-beta; otherwise we can push to 0.8.3. Guozhang On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote: +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later. I agree to the tickets you brought up to have in 0.8.2-beta and also https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178739#comment-14178739 ] Jun Rao commented on KAFKA-1481: Otis, Yes, we can include this in the 0.8.2 final release. Does that sound good? Thanks, Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-1683) Implement a session concept in the socket server
[ https://issues.apache.org/jira/browse/KAFKA-1683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira reassigned KAFKA-1683: --- Assignee: Gwen Shapira Implement a session concept in the socket server -- Key: KAFKA-1683 URL: https://issues.apache.org/jira/browse/KAFKA-1683 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Gwen Shapira To implement authentication we need a way to keep track of some things between requests. The initial use for this would be remembering the authenticated user/principle info, but likely more uses would come up (for example we will also need to remember whether and which encryption or integrity measures are in place on the socket so we can wrap and unwrap writes and reads). I was thinking we could just add a Session object that might have a user field. The session object would need to get added to RequestChannel.Request so it is passed down to the API layer with each request. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1687) SASL tests
[ https://issues.apache.org/jira/browse/KAFKA-1687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178791#comment-14178791 ] Gwen Shapira commented on KAFKA-1687: - MiniKDC can be used to test with Kerberos SASL tests -- Key: KAFKA-1687 URL: https://issues.apache.org/jira/browse/KAFKA-1687 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps We need tests for our SASL/Kerberos setup. This is not that easy to do with Kerberos because of the dependency on the KDC. However possibly we can test with another SASL mechanism that doesn't have that dependency? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1722) static analysis code coverage for pci audit needs
Joe Stein created KAFKA-1722: Summary: static analysis code coverage for pci audit needs Key: KAFKA-1722 URL: https://issues.apache.org/jira/browse/KAFKA-1722 Project: Kafka Issue Type: Bug Reporter: Joe Stein Fix For: 0.9.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1722) static analysis code coverage for pci audit needs
[ https://issues.apache.org/jira/browse/KAFKA-1722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1722: - Component/s: security static analysis code coverage for pci audit needs - Key: KAFKA-1722 URL: https://issues.apache.org/jira/browse/KAFKA-1722 Project: Kafka Issue Type: Bug Components: security Reporter: Joe Stein Fix For: 0.9.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1688) Add authorization interface and naive implementation
[ https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178799#comment-14178799 ] Don Bosco Durai edited comment on KAFKA-1688 at 10/21/14 6:33 PM: -- Joe/Jay, I can look into this. Can you assign this to me? Thanks was (Author: bosco): Joe/Jay, I can look into this. Thanks Add authorization interface and naive implementation Key: KAFKA-1688 URL: https://issues.apache.org/jira/browse/KAFKA-1688 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Add a PermissionManager interface as described here: https://cwiki.apache.org/confluence/display/KAFKA/Security (possibly there is a better name?) Implement calls to the PermissionsManager in KafkaApis for the main requests (FetchRequest, ProduceRequest, etc). We will need to add a new error code and exception to the protocol to indicate permission denied. Add a server configuration to give the class you want to instantiate that implements that interface. That class can define its own configuration properties from the main config file. Provide a simple implementation of this interface which just takes a user and ip whitelist and permits those in either of the whitelists to do anything, and denies all others. Rather than writing an integration test for this class we can probably just use this class for the TLS and SASL authentication testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-1555: -- Reviewer: Joel Koshy (was: Jun Rao) provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178815#comment-14178815 ] Vladimir Tretyakov commented on KAFKA-1481: --- Hi, Jun, will be perfect! Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein reassigned KAFKA-1684: Assignee: Ivan Lyutov Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Ivan Lyutov Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26666: Patch for KAFKA-1653
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/2/ --- (Updated Oct. 21, 2014, 6:58 p.m.) Review request for kafka. Bugs: KAFKA-1653 https://issues.apache.org/jira/browse/KAFKA-1653 Repository: kafka Description (updated) --- Generate error for duplicates in PreferredLeaderElectionCommand instead of just swallowing duplicates. Report which entries are duplicated for ReassignPartitionCommand since they may be difficult to find in large reassignments. Report duplicate topics and duplicate topic partitions in ReassignPartitionsCommand. Make all duplication error messagse include details about what item was duplicated. Diffs (updated) - core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala c7918483c02040a7cc18d6e9edbd20a3025a3a55 core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/admin/TopicCommand.scala 7672c5aab4fba8c23b1bb5cd4785c332d300a3fa core/src/main/scala/kafka/tools/StateChangeLogMerger.scala d298e7e81acc7427c6cf4796b445966267ca54eb core/src/main/scala/kafka/utils/Utils.scala 29d5a17d4a03cfd3f3cdd2994cbd783a4be2732e core/src/main/scala/kafka/utils/ZkUtils.scala a7b1fdcb50d5cf930352d37e39cb4fc9a080cb12 Diff: https://reviews.apache.org/r/2/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Commented] (KAFKA-1653) Duplicate broker ids allowed in replica assignment
[ https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178845#comment-14178845 ] Ewen Cheslack-Postava commented on KAFKA-1653: -- Updated reviewboard https://reviews.apache.org/r/2/diff/ against branch origin/trunk Duplicate broker ids allowed in replica assignment -- Key: KAFKA-1653 URL: https://issues.apache.org/jira/browse/KAFKA-1653 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch, KAFKA-1653_2014-10-21_11:57:50.patch The reassign partitions command and the controller do not ensure that all replicas for a partition are on different brokers. For example, you could set 1,2,2 as the list of brokers for the replicas. kafka-topics.sh --describe --under-replicated will list these partitions as under-replicated, but I can't see a reason why the controller should allow this state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1653) Duplicate broker ids allowed in replica assignment
[ https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1653: - Attachment: KAFKA-1653_2014-10-21_11:57:50.patch Duplicate broker ids allowed in replica assignment -- Key: KAFKA-1653 URL: https://issues.apache.org/jira/browse/KAFKA-1653 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Assignee: Ewen Cheslack-Postava Labels: newbie Attachments: KAFKA-1653.patch, KAFKA-1653_2014-10-16_14:54:07.patch, KAFKA-1653_2014-10-21_11:57:50.patch The reassign partitions command and the controller do not ensure that all replicas for a partition are on different brokers. For example, you could set 1,2,2 as the list of brokers for the replicas. kafka-topics.sh --describe --under-replicated will list these partitions as under-replicated, but I can't see a reason why the controller should allow this state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14178856#comment-14178856 ] Bhavesh Mistry commented on KAFKA-1721: --- I have filled https://github.com/xerial/snappy-java/issues/88 for tracking for Snappy. There is patch provided and Thanks to [~ewencp] for testing the patch. Please see above link for more details. Thanks, Bhavesh Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Kafka Security
Hi, quick circle back around so folks are aware. The security tickets are tracked here https://issues.apache.org/jira/browse/KAFKA/component/12324383/?selectedTab=com.atlassian.jira.jira-projects-plugin:component-issues-panel If you want to jump in on a ticket please do! If no one is assigned then assign it to your self, ask questions, start coding and give it a go! If you can't assign to yourself send email on list with the ticket number and your user name. Someone can then assign it to you and also add perms in JIRA so moving forward you can do that too. If someone is already assigned they may be working on it already. If it has been 3 weeks (lets say) and no patches/comments, etc then a ping in any case I think is good to-do regardless. A patch should be going up tomorrow for https://issues.apache.org/jira/browse/KAFKA-1684 that includes the MetaDataResponse changes required too for channel (plaintext and ssl) port advertisement. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Sep 22, 2014 at 8:38 PM, Joe Stein joe.st...@stealth.ly wrote: At the request of the folks that were at the first meeting and can't attend tomorrow I am moving tomorrow's meeting to next Tuesday (same bat time, same bat channel). /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Tue, Sep 16, 2014 at 4:59 PM, Joe Stein joe.st...@stealth.ly wrote: yup, yup, yup | done, done, done On Tue, Sep 16, 2014 at 1:54 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, Can you add me, Jun, and Neha too. -Jay On Tue, Sep 16, 2014 at 10:37 AM, Joe Stein joe.st...@stealth.ly wrote: Hi Andrew, yes the meeting took place and we plan to-do it every two weeks (same bat time, same bat channel) moving forward. In attendance was Michael Herstine (LinkedIn), Arvind Mani (LinkedIn), Gwen Shapira (Cloudera) and myself. Gwen updated the wiki after our discussion. Basically we are thinking of using 3 ports one for plain text (so like it is now), one for SASL (implementing username/password and kerberos at least) and one for SSL and they will all be configurable on/off. Some investigation is going on now to see about how we can do this without making any wire protocol changes (ideal) or minimal changes at least. Let me know and I can add you to the invite if you would like to contribute the more help and input the better for sure. Also in regards to KAFKA-1477 I just asked Ivan to update the patch to latest trunk and we could (demand required) make a patch that works with 0.8.1.X too for folks to use... This doesn't work yet with the new producer (TBD) or any other clients so be aware it is not yet baked in and from release project perspective I don't know what in that patch will survive (hopefully all of it). /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Tue, Sep 16, 2014 at 10:17 AM, Andrew Psaltis psaltis.and...@gmail.com wrote: Hi, I was just reading the recent changes to: https://cwiki.apache.org/confluence/display/KAFKA/Security after getting off a call about Kafka security and how we are jumping through hoops -- like having PGP keys on the consumers and producers to get around the lack of SSL support. Did the meeting that Joe proposed happen for Sept 9th happen? If not is there a plan to have it? I was also looking at: https://issues.apache.org/jira/browse/KAFKA-1477 and it seems like there have been no comments since 11/08/2014. I would be interested in helping with the TLS/SSL support as we have a need for it now. Thanks, Andrew
[jira] [Commented] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179014#comment-14179014 ] Bhavesh Mistry commented on KAFKA-1710: --- [~jkreps], I am sorry I did not get back to you soon. The cost of enqueue a message into single partition only is ~54%. Here is test I have done: To *single* partition: Throughput per Thread=2666.5 byte(s)/microsecond All done...! To *all* partition: Throughput per Thread=5818.181818181818 byte(s)/microsecond All done...! The cost of sync block in roughly around {code} package org.kafka.test; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 75; static CountDownLatch latch = new CountDownLatch(numberTh); public static void main(String[] args) throws IOException, InterruptedException { //Thread.sleep(6); Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = logmon.test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; int numberOfLoop = 5000; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 1; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); MyProducer [] producerThResult = new MyProducer [numberTh]; for(int i = 0 ; i numberTh;i++){ producerThResult[i] = new MyProducer(producer,numberOfLoop,builder.toString(), topic); service.execute(producerThResult[i]); } latch.await(); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All Producers done...!); // now interpret the result... of this... long lowestTime = 0 ; for(int i =0 ; i producerThResult.length;i++){ if(i == 1){ lowestTime = producerThResult[i].totalTimeinNano; }else if ( producerThResult[i].totalTimeinNano lowestTime){ lowestTime = producerThResult[i].totalTimeinNano; } } long bytesSend = msgLenth * numberOfLoop; long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime, TimeUnit.NANOSECONDS); double throughput = (bytesSend * 1.0) / (durationInMs); System.out.println(Throughput per Thread= + throughput + byte(s)/microsecond); System.out.println(All done...!); } static class MyProducer implements CallableLong , Runnable { Producer[] producer; long maxloops; String msg ; String topic; long totalTimeinNano = 0; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { // ALWAYS SEND DATA TO PARTITION 1 only... //ProducerRecord record = new ProducerRecord(topic, 1,null,msg.toString().getBytes()); ProducerRecord record = new
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179014#comment-14179014 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/21/14 8:26 PM: - [~jkreps], I am sorry I did not get back to you soon. The cost of enqueue a message into single partition is ~54% as compare to round-robin. (test with 32 partition to single topic and 3 cluster) The throughput is measuring the cost of put data into buffer. Here is test I have done: To *single* partition: Throughput per Thread=2666.5 byte(s)/microsecond All done...! To *all* partition: Throughput per Thread=5818.181818181818 byte(s)/microsecond All done...! {code} package org.kafka.test; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 75; static CountDownLatch latch = new CountDownLatch(numberTh); public static void main(String[] args) throws IOException, InterruptedException { //Thread.sleep(6); Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = logmon.test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; int numberOfLoop = 5000; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 1; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); MyProducer [] producerThResult = new MyProducer [numberTh]; for(int i = 0 ; i numberTh;i++){ producerThResult[i] = new MyProducer(producer,numberOfLoop,builder.toString(), topic); service.execute(producerThResult[i]); } latch.await(); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All Producers done...!); // now interpret the result... of this... long lowestTime = 0 ; for(int i =0 ; i producerThResult.length;i++){ if(i == 1){ lowestTime = producerThResult[i].totalTimeinNano; }else if ( producerThResult[i].totalTimeinNano lowestTime){ lowestTime = producerThResult[i].totalTimeinNano; } } long bytesSend = msgLenth * numberOfLoop; long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime, TimeUnit.NANOSECONDS); double throughput = (bytesSend * 1.0) / (durationInMs); System.out.println(Throughput per Thread= + throughput + byte(s)/microsecond); System.out.println(All done...!); } static class MyProducer implements CallableLong , Runnable { Producer[] producer; long maxloops; String msg ; String topic; long totalTimeinNano = 0; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { // ALWAYS SEND DATA TO PARTITION 1 only...
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179014#comment-14179014 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/21/14 8:28 PM: - [~jkreps], I am sorry I did not get back to you so soon. The cost of enqueue a message into single partition is ~54% as compare to round-robin. (test with 32 partitions to single topic and 3 broker cluster) The throughput is measuring the cost of put data into buffer only not cost of sending data to brokers. Here is test I have done: To *single* partition: Throughput per Thread=2666.5 byte(s)/microsecond All done...! To *all* partition: Throughput per Thread=5818.181818181818 byte(s)/microsecond All done...! {code} package org.kafka.test; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 75; static CountDownLatch latch = new CountDownLatch(numberTh); public static void main(String[] args) throws IOException, InterruptedException { //Thread.sleep(6); Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = logmon.test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; int numberOfLoop = 5000; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 1; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); MyProducer [] producerThResult = new MyProducer [numberTh]; for(int i = 0 ; i numberTh;i++){ producerThResult[i] = new MyProducer(producer,numberOfLoop,builder.toString(), topic); service.execute(producerThResult[i]); } latch.await(); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All Producers done...!); // now interpret the result... of this... long lowestTime = 0 ; for(int i =0 ; i producerThResult.length;i++){ if(i == 1){ lowestTime = producerThResult[i].totalTimeinNano; }else if ( producerThResult[i].totalTimeinNano lowestTime){ lowestTime = producerThResult[i].totalTimeinNano; } } long bytesSend = msgLenth * numberOfLoop; long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime, TimeUnit.NANOSECONDS); double throughput = (bytesSend * 1.0) / (durationInMs); System.out.println(Throughput per Thread= + throughput + byte(s)/microsecond); System.out.println(All done...!); } static class MyProducer implements CallableLong , Runnable { Producer[] producer; long maxloops; String msg ; String topic; long totalTimeinNano = 0; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() { // ALWAYS SEND DATA TO
[jira] [Comment Edited] (KAFKA-1710) [New Java Producer Potential Deadlock] Producer Deadlock when all messages is being sent to single partition
[ https://issues.apache.org/jira/browse/KAFKA-1710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179014#comment-14179014 ] Bhavesh Mistry edited comment on KAFKA-1710 at 10/21/14 8:29 PM: - [~jkreps], I am sorry I did not get back to you so soon. The cost of enqueue a message into single partition is ~54% as compare to round-robin. (test with 32 partitions to single topic and 3 broker cluster) The throughput is measuring the cost of put data into buffer only not cost of sending data to brokers. Here is test I have done: To *single* partition: Throughput per Thread=2666.5 byte(s)/microsecond All done...! To *all* partition: Throughput per Thread=5818.181818181818 byte(s)/microsecond All done...! Here is test program for this: {code} package org.kafka.test; import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class TestNetworkDownProducer { static int numberTh = 75; static CountDownLatch latch = new CountDownLatch(numberTh); public static void main(String[] args) throws IOException, InterruptedException { //Thread.sleep(6); Properties prop = new Properties(); InputStream propFile = Thread.currentThread().getContextClassLoader() .getResourceAsStream(kafkaproducer.properties); String topic = logmon.test; prop.load(propFile); System.out.println(Property: + prop.toString()); StringBuilder builder = new StringBuilder(1024); int msgLenth = 256; int numberOfLoop = 5000; for (int i = 0; i msgLenth; i++) builder.append(a); int numberOfProducer = 1; Producer[] producer = new Producer[numberOfProducer]; for (int i = 0; i producer.length; i++) { producer[i] = new KafkaProducer(prop); } ExecutorService service = new ThreadPoolExecutor(numberTh, numberTh, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable(numberTh *2)); MyProducer [] producerThResult = new MyProducer [numberTh]; for(int i = 0 ; i numberTh;i++){ producerThResult[i] = new MyProducer(producer,numberOfLoop,builder.toString(), topic); service.execute(producerThResult[i]); } latch.await(); for (int i = 0; i producer.length; i++) { producer[i].close(); } service.shutdownNow(); System.out.println(All Producers done...!); // now interpret the result... of this... long lowestTime = 0 ; for(int i =0 ; i producerThResult.length;i++){ if(i == 1){ lowestTime = producerThResult[i].totalTimeinNano; }else if ( producerThResult[i].totalTimeinNano lowestTime){ lowestTime = producerThResult[i].totalTimeinNano; } } long bytesSend = msgLenth * numberOfLoop; long durationInMs = TimeUnit.MILLISECONDS.convert(lowestTime, TimeUnit.NANOSECONDS); double throughput = (bytesSend * 1.0) / (durationInMs); System.out.println(Throughput per Thread= + throughput + byte(s)/microsecond); System.out.println(All done...!); } static class MyProducer implements CallableLong , Runnable { Producer[] producer; long maxloops; String msg ; String topic; long totalTimeinNano = 0; MyProducer(Producer[] list, long maxloops,String msg,String topic){ this.producer = list; this.maxloops = maxloops; this.msg = msg; this.topic = topic; } public void run() {
Review Request 26994: Patch for KAFKA-1719
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- make mirror maker exit when one consumer/producer thread exits. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-1719) Make mirror maker exit when one consumer/producer thread exits.
[ https://issues.apache.org/jira/browse/KAFKA-1719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1719: Attachment: KAFKA-1719.patch Make mirror maker exit when one consumer/producer thread exits. --- Key: KAFKA-1719 URL: https://issues.apache.org/jira/browse/KAFKA-1719 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1719.patch When one of the consumer/producer thread exits, the entire mirror maker will be blocked. In this case, it is better to make it exit. It seems a single ZookeeperConsumerConnector is sufficient for mirror maker, probably we don't need a list for the connectors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1719) Make mirror maker exit when one consumer/producer thread exits.
[ https://issues.apache.org/jira/browse/KAFKA-1719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179047#comment-14179047 ] Jiangjie Qin commented on KAFKA-1719: - Created reviewboard https://reviews.apache.org/r/26994/diff/ against branch origin/trunk Make mirror maker exit when one consumer/producer thread exits. --- Key: KAFKA-1719 URL: https://issues.apache.org/jira/browse/KAFKA-1719 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1719.patch When one of the consumer/producer thread exits, the entire mirror maker will be blocked. In this case, it is better to make it exit. It seems a single ZookeeperConsumerConnector is sufficient for mirror maker, probably we don't need a list for the connectors. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts
[ https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179077#comment-14179077 ] Andrew Olson commented on KAFKA-1647: - [~nehanarkhede] Will this correction be included in 0.8.2-beta? It sounds like a blocker for the 0.8.2 release, in any case. Replication offset checkpoints (high water marks) can be lost on hard kills and restarts Key: KAFKA-1647 URL: https://issues.apache.org/jira/browse/KAFKA-1647 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Assignee: Jiangjie Qin Priority: Critical Labels: newbie++ Attachments: KAFKA-1647.patch, KAFKA-1647_2014-10-13_16:38:39.patch, KAFKA-1647_2014-10-18_00:26:51.patch We ran into this scenario recently in a production environment. This can happen when enough brokers in a cluster are taken down. i.e., a rolling bounce done properly should not cause this issue. It can occur if all replicas for any partition are taken down. Here is a sample scenario: * Cluster of three brokers: b0, b1, b2 * Two partitions (of some topic) with replication factor two: p0, p1 * Initial state: p0: leader = b0, ISR = {b0, b1} p1: leader = b1, ISR = {b0, b1} * Do a parallel hard-kill of all brokers * Bring up b2, so it is the new controller * b2 initializes its controller context and populates its leader/ISR cache (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last known leaders are b0 (for p0) and b1 (for p2) * Bring up b1 * The controller's onBrokerStartup procedure initiates a replica state change for all replicas on b1 to become online. As part of this replica state change it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not included in the leaders field because b0 is down. * On receiving the LeaderAndIsrRequest, b1's replica manager will successfully make itself (b1) the leader for p1 (and create the local replica object corresponding to p1). It will however abort the become follower transition for p0 because the designated leader b0 is offline. So it will not create the local replica object for p0. * It will then start the high water mark checkpoint thread. Since only p1 has a local replica object, only p1's high water mark will be checkpointed to disk. p0's previously written checkpoint if any will be lost. So in summary it seems we should always create the local replica object even if the online transition does not happen. Possible symptoms of the above bug could be one or more of the following (we saw 2 and 3): # Data loss; yes on a hard-kill data loss is expected, but this can actually cause loss of nearly all data if the broker becomes follower, truncates, and soon after happens to become leader. # High IO on brokers that lose their high water mark then subsequently (on a successful become follower transition) truncate their log to zero and start catching up from the beginning. # If the offsets topic is affected, then offsets can get reset. This is because during an offset load we don't read past the high water mark. So if a water mark is missing then we don't load anything (even if the offsets are there in the log). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1477) add authentication layer and initial JKS x509 implementation for brokers, producers and consumer for network communication
[ https://issues.apache.org/jira/browse/KAFKA-1477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179081#comment-14179081 ] Jay Kreps commented on KAFKA-1477: -- I took a look at this patch in a little more detail, I think there is likely a fair bit of work to do before we can check this in. For example, some things that concern me: The SSLSocketChannel class extends SocketChannel. We seem to be simulating blocking on a non-blocking socket using sleep calls in a loop. Then even lots of minor things like channelFor is doing handshaking and some odd unfinished looking code. I suspect some of this may be done this way to minimize impact to existing code since it was being maintained as a patch, but that won't make sense once it is committed. What about this as a path forward. Let's take this patch and extract just the server-side SSL support in SocketServer and try to get that into shape to be something we can commit. I think we can do this without simultaneously doing the clients. I think if we try to do this all at once we aren't going to get there. We can test this by adding to SocketServerTest and just using a blocking SSL connection. Here is what I think we need to do: 1. Do we need SSLSocketChannel? I think as long as the acceptor completes the handshake from then on all that is needed is to wrap/unwrap bytes, right? 2. Modify the acceptor in SocketServer to do non-blocking handling of the SSL handshake. By the time the socket is accepted and handed over to the processor the ssl handshake should be complete. 3. Create some kind of generic interface for wrap/upwrap (SecurityCodec?) as we will need to implement this for both ssl and for kerberos. This interface will wrap the SSLEngine (or SASL engine) associated with a given connection. add authentication layer and initial JKS x509 implementation for brokers, producers and consumer for network communication -- Key: KAFKA-1477 URL: https://issues.apache.org/jira/browse/KAFKA-1477 Project: Kafka Issue Type: Sub-task Components: security Reporter: Joe Stein Assignee: Ivan Lyutov Fix For: 0.8.3 Attachments: KAFKA-1477-binary.patch, KAFKA-1477.patch, KAFKA-1477_2014-06-02_16:59:40.patch, KAFKA-1477_2014-06-02_17:24:26.patch, KAFKA-1477_2014-06-03_13:46:17.patch, KAFKA-1477_trunk.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179092#comment-14179092 ] Bhavesh Mistry commented on KAFKA-1481: --- Hi [~junrao], Can you please let me know if this will also address the [New Java Producer] metrics() method and which client.id or topic has special chars ? So we have consistent naming across all JMX name bean or metrics() methods ? Here is background on this: {code} Bhavesh, Yes, allowing dot in clientId and topic makes it a bit harder to define the JMX bean names. I see a couple of solutions here. 1. Disable dot in clientId and topic names. The issue is that dot may already be used in existing deployment. 2. We can represent the JMX bean name differently in the new producer. Instead of kafka.producer.myclientid:type=mytopic we could change it to kafka.producer:clientId=myclientid,topic=mytopic I felt that option 2 is probably better since it doesn't affect existing users. Otis, We probably can also use option 2 to address KAFKA-1481. For topic/clientid specific metrics, we could explicitly specify the metric name so that it contains topic=mytopic,clientid=myclientid. That seems to be a much cleaner way than having all parts included in a single string separated by '|'. Thanks, Jun {code} Thanks, Bhavesh Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[VOTE] 0.8.2-beta Release Candidate 1
This is the first candidate for release of Apache Kafka 0.8.2-beta Release Notes for the 0.8.2-beta release https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/RELEASE_NOTES.html *** Please download, test and vote by Friday, October 24th, 2pm PT Kafka's KEYS file containing PGP keys we use to sign the release: https://svn.apache.org/repos/asf/kafka/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/scala-doc/ * java-doc https://people.apache.org/~joestein/kafka-0.8.2-beta-candidate1/java-doc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2-beta tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=2b2c3da2c52bc62a89d60f85125d3723c8410fa0 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179102#comment-14179102 ] Jun Rao commented on KAFKA-1481: Thanks for the patch. Some comments below. 20. KafkaMetricsGroup: 20.1 In the following, instead of doing map(kv = ), could we do map { case(key, value) = }? .filter(_._2 != ).map(kv = %s=%s.format(kv._1, kv._2)) 20.2 In the following, shouldn't the pattern be (.* + clientId= + clientId + .*).r val pattern = (.* + clientId + .*).r 21. TaggableInfo: tags should be an immutable hashmap, right? case class TaggableInfo(tags: mutable.LinkedHashMap[String, String]) extends Taggable { 22. New files: 22.1 The license header should be at the very beginning, before the package and import. 22.2 I am not sure if we really need to have the Taggable trait. For example, in ConsumerTopicMetrics, we can just take the original clientIdAndTopic and explicitly create the tag for clientId and topic when creating those metrics. The tags are really specific to the metrics. So, we probably don't need to expose them in places other than where the metrics are created. 23. ReplicaFetcherManager: The first statement in shutdown() is on the same line as the method. A similar issue happens in a few other classes like RequestChannel and ConsumerFetcherManager. Perhaps you can follow the patch creation process in https://cwiki.apache.org/confluence/display/KAFKA/Patch+submission+and+review#Patchsubmissionandreview-Simplecontributorworkflow 24. FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(): Should we use clientId= + clientId in pattern? 25. SimpleConsumer: Ideally, we shouldn't change the constructor since this will be an api change. 26. Could you add a unit test to make sure that after a producer/consumer is closed, all metrics specific to the producer/consumer are removed? Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts
[ https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-1647: - Fix Version/s: 0.8.2 Replication offset checkpoints (high water marks) can be lost on hard kills and restarts Key: KAFKA-1647 URL: https://issues.apache.org/jira/browse/KAFKA-1647 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Assignee: Jiangjie Qin Priority: Critical Labels: newbie++ Fix For: 0.8.2 Attachments: KAFKA-1647.patch, KAFKA-1647_2014-10-13_16:38:39.patch, KAFKA-1647_2014-10-18_00:26:51.patch We ran into this scenario recently in a production environment. This can happen when enough brokers in a cluster are taken down. i.e., a rolling bounce done properly should not cause this issue. It can occur if all replicas for any partition are taken down. Here is a sample scenario: * Cluster of three brokers: b0, b1, b2 * Two partitions (of some topic) with replication factor two: p0, p1 * Initial state: p0: leader = b0, ISR = {b0, b1} p1: leader = b1, ISR = {b0, b1} * Do a parallel hard-kill of all brokers * Bring up b2, so it is the new controller * b2 initializes its controller context and populates its leader/ISR cache (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last known leaders are b0 (for p0) and b1 (for p2) * Bring up b1 * The controller's onBrokerStartup procedure initiates a replica state change for all replicas on b1 to become online. As part of this replica state change it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not included in the leaders field because b0 is down. * On receiving the LeaderAndIsrRequest, b1's replica manager will successfully make itself (b1) the leader for p1 (and create the local replica object corresponding to p1). It will however abort the become follower transition for p0 because the designated leader b0 is offline. So it will not create the local replica object for p0. * It will then start the high water mark checkpoint thread. Since only p1 has a local replica object, only p1's high water mark will be checkpointed to disk. p0's previously written checkpoint if any will be lost. So in summary it seems we should always create the local replica object even if the online transition does not happen. Possible symptoms of the above bug could be one or more of the following (we saw 2 and 3): # Data loss; yes on a hard-kill data loss is expected, but this can actually cause loss of nearly all data if the broker becomes follower, truncates, and soon after happens to become leader. # High IO on brokers that lose their high water mark then subsequently (on a successful become follower transition) truncate their log to zero and start catching up from the beginning. # If the offsets topic is affected, then offsets can get reset. This is because during an offset load we don't read past the high water mark. So if a water mark is missing then we don't load anything (even if the offsets are there in the log). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1723) make the metrics name in new producer more standard
Jun Rao created KAFKA-1723: -- Summary: make the metrics name in new producer more standard Key: KAFKA-1723 URL: https://issues.apache.org/jira/browse/KAFKA-1723 Project: Kafka Issue Type: Sub-task Components: clients Affects Versions: 0.8.2 Reporter: Jun Rao The jmx name in the new producer looks like the following: kafka.producer.myclientid:type=mytopic However, this can be ambiguous since we allow . in client id and topic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1723) make the metrics name in new producer more standard
[ https://issues.apache.org/jira/browse/KAFKA-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179128#comment-14179128 ] Jun Rao commented on KAFKA-1723: One way to address this issue is to change the jmx name to the following. kafka.producer:clientId=myclientid,topic=mytopic This will make sure the jmx name is consistent with the fix in KAFKA-1481. make the metrics name in new producer more standard --- Key: KAFKA-1723 URL: https://issues.apache.org/jira/browse/KAFKA-1723 Project: Kafka Issue Type: Sub-task Components: clients Affects Versions: 0.8.2 Reporter: Jun Rao Fix For: 0.8.3 The jmx name in the new producer looks like the following: kafka.producer.myclientid:type=mytopic However, this can be ambiguous since we allow . in client id and topic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179132#comment-14179132 ] Jun Rao commented on KAFKA-1481: Bhavesh, Created a subtask to track the metric name in the new producer. Thanks, Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26994: Patch for KAFKA-1719
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/#review57680 --- core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment98497 Just a stylish comment: could you group java imports with scala / other lib imports, and leave kafka imports at the top? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment98503 Can we add a FATAL log entry here: Consumer thread existed abnormally, stopping the whole mirror maker? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment98501 Ditto above. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment98499 Is this change intended? - Guozhang Wang On Oct. 21, 2014, 8:37 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 21, 2014, 8:37 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- make mirror maker exit when one consumer/producer thread exits. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
Re: [DISCUSSION] Message Metadata
Hi Jun, Regarding 4) in your comment, after thinking it for a while I cannot come up a way to it along with log compaction without adding new fields into the current format on message set. Do you have a better way that do not require protocol changes? Guozhang On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang wangg...@gmail.com wrote: I have updated the wiki page incorporating received comments. We can discuss some more details on: 1. How we want to do audit? Whether we want to have in-built auditing on brokers or even MMs or use an audit consumer to fetch all messages from just brokers. 2. How we can avoid de-/re-compression on brokers and MMs with log compaction turned on. 3. How we can resolve unclean leader election resulted data inconsistency with control messages. Guozhang On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks for the detailed comments Jun! Some replies inlined. On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote: Hi, Guozhang, Thanks for the writeup. A few high level comments. 1. Associating (versioned) schemas to a topic can be a good thing overall. Yes, this could add a bit more management overhead in Kafka. However, it makes sure that the data format contract between a producer and a consumer is kept and managed in a central place, instead of in the application. The latter is probably easier to start with, but is likely to be brittle in the long run. I am actually not proposing to not support associated versioned schemas for topics, but to not let some core Kafka functionalities like auditing being depend on schemas. I think this alone can separate the schema management from Kafka piping management (i.e. making sure every single message is delivered, and within some latency, etc). Adding additional auditing info into an existing schema will force Kafka to be aware of the schema systems (Avro, JSON, etc). 2. Auditing can be a general feature that's useful for many applications. Such a feature can be implemented by extending the low level message format with a header. However, it can also be added as part of the schema management. For example, you can imagine a type of audited schema that adds additional auditing info to an existing schema automatically. Performance wise, it probably doesn't make a big difference whether the auditing info is added in the message header or the schema header. See replies above. 3. We talked about avoiding the overhead of decompressing in both the broker and the mirror maker. We probably need to think through how this works with auditing. In the more general case where you want to audit every message, one has to do the decompression to get the individual message, independent of how the auditing info is stored. This means that if we want to audit the broker directly or the consumer in mirror maker, we have to pay the decompression cost anyway. Similarly, if we want to extend mirror maker to support some customized filtering/transformation logic, we also have to pay the decompression cost. I see your point. For that I would prefer to have a MM implementation that is able to do de-compress / re-compress ONLY if required, for example by auditing, etc. I agree that we have not thought through whether we should enable auditing on MM, and if yes how to do that, and we could discuss about that in a different thread. Overall, this proposal is not just for tackling de-compression on MM but about the feasibility of extending Kafka message header for system properties / app properties. Some low level comments. 4. Broker offset reassignment (kafka-527): This probably can be done with just a format change on the compressed message set. That is true. As I mentioned in the wiki each of the problems may be resolvable separately but I am thinking about a general way to get all of them. 5. MirrorMaker refactoring: We probably can think through how general we want mirror maker to be. If we want to it to be more general, we likely need to decompress every message just like in a normal consumer. There will definitely be overhead. However, as long as mirror maker is made scalable, we can overcome the overhead by just running more instances on more hardware resources. As for the proposed message format change, we probably need to think through it a bit more. The honor-ship flag seems a bit hacky to me. Replied as part of 3). Sure we can discuss more about that, will update the wiki for collected comments. 6. Adding a timestamp in each message can be a useful thing. It (1) allows log segments to be rolled more accurately; (2) allows finding an offset for a particular timestamp more accurately. I am thinking that the timestamp in the message should probably be the time when the leader receives the message. Followers preserve the timestamp set by leader. To avoid time going back during leader change, the leader can
[jira] [Reopened] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy reopened KAFKA-1555: --- I'm reopening this so we won't forget about the documentation patch. If you prefer, we can open a separate jira for that and close this. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555-DOCS.0.patch, KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch, KAFKA-1555.5.patch, KAFKA-1555.6.patch, KAFKA-1555.8.patch, KAFKA-1555.9.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 24676: Rebase KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review57235 --- Very nicely done. These are all minor comments - all but one concerning emptying the producer request that should be easily fixable if it is an issue. (It is the top comment) core/src/main/scala/kafka/api/ProducerRequest.scala https://reviews.apache.org/r/24676/#comment98557 I have a concern that this may actually be still needed. See comment under handleProducerRequest.sendResponseCallback core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/24676/#comment97784 Maybe use this: Recorded replica %d log end offset (LEO)... Also, instead of an explicit [%s,%d] format specifier I think we should start doing the following: %s.format(TopicAndPartition(topic, partition)) That way we ensure a canonical toString for topic/partition pairs and can change it in one place in the future. There are some places where we don't log with this agreed-upon format and it is a bit annoying, so going forward I think we should use the above. Can we add it to the logging improvements wiki? core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/24676/#comment97788 Since we still may update the HW shall we rename this to maybeUpdateHWAndExpandIsr core/src/main/scala/kafka/log/Log.scala https://reviews.apache.org/r/24676/#comment97797 Since this contains hw (which is a replication detail) should it really be in the replica manager? core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97809 How about just calling this responseCallback? It is slightly confusing to see references to callbackOnComplete and onComplete in the same class. core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97801 The earlier comment was useful. i.e., (in which case we return whatever data is available for the partitions that are currently led by this broker) core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97805 I'm a bit confused by case C. It can also happen if the delayed fetch happens to straddle a segment roll event; the comment seems a bit misleading/incomplete without that. In fact, if it is lagging shouldn't it have been satisfied immediately without having to create a DelayedFetch in the first place? core/src/main/scala/kafka/server/DelayedProduce.scala https://reviews.apache.org/r/24676/#comment98139 Similar comment as in DelayedFetch on naming this. core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/24676/#comment98165 Why is this additional logging necessary? KafkaApis currently has catch-all for unhandled exceptions. Error codes can be inspected via public access logs if required right? core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/24676/#comment98166 Same here. core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/24676/#comment98558 I'm not sure how scala treats this under the hood, but it _has_ to hold a reference to request until the callback is executed. i.e., we probably still want to empty the request data. core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/24676/#comment98180 to fetch messages core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/24676/#comment98182 Are these changes intentional? core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/24676/#comment98184 commitStatusView Also, can we just compute the final status once at the end as opposed to preparing an initial response status and modifying later? core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/24676/#comment98194 Do you think it would be clearer to name this onAppend or something similar? core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/24676/#comment97799 Should we rename ReplicaManager to ReplicatedLogManager? core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/24676/#comment98372 (for regular consumer fetch) core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/24676/#comment98380 This is old code and we don't need to address it in this patch, but I was wondering if it makes sense to respond sooner if there is at least one error in the local append. What do you think? i.e., I don't remember a good reason for holding on to the request if there are i numPartitions errors in local append. core/src/main/scala/kafka/utils/DelayedItem.scala
[jira] [Commented] (KAFKA-1583) Kafka API Refactoring
[ https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179460#comment-14179460 ] Joel Koshy commented on KAFKA-1583: --- I posted review comments, mostly minor. Did you get a chance to run the system tests with this patch? Kafka API Refactoring - Key: KAFKA-1583 URL: https://issues.apache.org/jira/browse/KAFKA-1583 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch, KAFKA-1583_2014-10-17_09:56:33.patch This is the next step of KAFKA-1430. Details can be found at this page: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179472#comment-14179472 ] Jun Rao commented on KAFKA-1718: Yes, the broker does recompress all messages in the messageSet passed to Log.append together into a single compressed message. In the java/scala producer, it'a always the case that a messageSet for a partition in a produce request always contains a single compressed message. I guess your go producer can send multiple compressed messages for a single partition. Is there any benefit in doing that? Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly. --- This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library). --- I am happy to provide any more information you might need, or to do relevant experiments etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179485#comment-14179485 ] Evan Huus commented on KAFKA-1718: -- ??I guess your go producer can send multiple compressed messages for a single partition?? Yes, that's exactly what it's doing. If it collects enough messages for a partition that they would exceed {{message.max.bytes}} when compressed together, it batches them and sends each batch as a compressed message in the same messageSet. ??Is there any benefit in doing that??? More-or-less to get around the limit on message sizes, which I guess doesn't work so well :) A few points on this then: * Currently (with default broker settings) you can produce just under 100MiB (socket.request.max.bytes) of messages to the broker uncompressed in a single request, but you can't produce that seem batch of messages in compressed form since the resulting compressed message would almost certainly be larger than 1MB (message.max.bytes). This discrepancy seems odd to me. * I understand the desire to limit real message sizes to prevent misbehaving producers from causing problems. However, I don't think the limit is particularly useful when applied to the compressed meta-messages; why shouldn't they be arbitrarily large, within the limits of {{socket.request.max.bytes}}? * I don't think the broker should assume there's only one compressed message per message-set; if a message-set contains multiple compressed messages, it should process them one-at-a-time and store each individually, rather than trying to do them all at once. Thanks for all your help! Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 112. at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267) at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32) at kafka.log.Log.append(Log.scala:265) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292) at kafka.server.KafkaApis.handle(KafkaApis.scala:185) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:695) Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that
[jira] [Comment Edited] (KAFKA-1718) Message Size Too Large error when only small messages produced with Snappy
[ https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14179485#comment-14179485 ] Evan Huus edited comment on KAFKA-1718 at 10/22/14 2:26 AM: ??I guess your go producer can send multiple compressed messages for a single partition?? Yes, that's exactly what it's doing. If it collects enough messages for a partition that they would exceed {{message.max.bytes}} when compressed together, it batches them and sends each batch as a compressed message in the same messageSet. ??Is there any benefit in doing that??? More-or-less to get around the limit on message sizes, which I guess doesn't work so well :) A few points on this then: * Currently (with default broker settings) you can produce just under 100MiB (socket.request.max.bytes) of messages to the broker uncompressed in a single request, but you can't produce that seem batch of messages in compressed form since the resulting compressed message would almost certainly be larger than 1MB (message.max.bytes). This discrepancy seems odd to me. * I understand the desire to limit real message sizes to prevent misbehaving producers from causing problems. However, I don't think the limit is particularly useful when applied to the compressed meta-messages; why shouldn't they be arbitrarily large, within the limits of {{socket.request.max.bytes}}? * I don't think the broker should assume there's only one compressed message per message-set; if a message-set contains multiple compressed messages, it should process them one-at-a-time and store each individually, rather than trying to do them all at once. Thanks for all your help! Edit: If for some reason you decide to keep the current behaviour as-is, please document this in the protocol spec on the wiki, since as far as I can the spec gives no reason to believe that multiple compressed messages will be combined, and that the _combined_ length will be relevant. was (Author: eapache): ??I guess your go producer can send multiple compressed messages for a single partition?? Yes, that's exactly what it's doing. If it collects enough messages for a partition that they would exceed {{message.max.bytes}} when compressed together, it batches them and sends each batch as a compressed message in the same messageSet. ??Is there any benefit in doing that??? More-or-less to get around the limit on message sizes, which I guess doesn't work so well :) A few points on this then: * Currently (with default broker settings) you can produce just under 100MiB (socket.request.max.bytes) of messages to the broker uncompressed in a single request, but you can't produce that seem batch of messages in compressed form since the resulting compressed message would almost certainly be larger than 1MB (message.max.bytes). This discrepancy seems odd to me. * I understand the desire to limit real message sizes to prevent misbehaving producers from causing problems. However, I don't think the limit is particularly useful when applied to the compressed meta-messages; why shouldn't they be arbitrarily large, within the limits of {{socket.request.max.bytes}}? * I don't think the broker should assume there's only one compressed message per message-set; if a message-set contains multiple compressed messages, it should process them one-at-a-time and store each individually, rather than trying to do them all at once. Thanks for all your help! Message Size Too Large error when only small messages produced with Snappy Key: KAFKA-1718 URL: https://issues.apache.org/jira/browse/KAFKA-1718 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Evan Huus Priority: Critical I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow. Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets). The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages. However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being: kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which