[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14147474#comment-14147474 ] Sriram Subramanian commented on KAFKA-1555: --- 1. Great. Not supporting all values above ack 1 is a good step. We are essentially not using it as an integer any more. I would still love it to be made more explicit with an enum for clarity. 2. Also, by setting ack = -1 and min_isr = 2, we still do not avoid data loss when one broker goes down. The issue is the way we select a leader. When a request was written to the leader, the min_isr check could have succeeded and we would have written to min_isr - 1 number of replicas. However, the replicas could subsequently go out of the isr. When the leader fails after that, we would have an unclean leader election and select any replica as the leader and it could be one that was lagging. To completely guarantee no data loss, we would need to do the following a. Ensure logs do not diverge on unclean leader elections b. Choose the broker with the longest log as the leader 3. We may have not documented ack 1 but since it is an integer, there are chances somebody could be using it. In such a case this could be a backwards incompatible change. It would be worth mentioning it in the release notes. 4. Long term, I think the min_isr config should be in the API. This gives better control per message and explicitly lets the caller know what guarantees they get. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-328) Write unit test for kafka server startup and shutdown API
[ https://issues.apache.org/jira/browse/KAFKA-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14147815#comment-14147815 ] BalajiSeshadri commented on KAFKA-328: -- [~nehanarkhede] Can you please send me code style you are using ?. Are you are using IntelliJ or Eclipse,please give me the formmater xml,i will import it to my environment. Write unit test for kafka server startup and shutdown API -- Key: KAFKA-328 URL: https://issues.apache.org/jira/browse/KAFKA-328 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: BalajiSeshadri Labels: newbie Attachments: KAFKA-328.patch Background discussion in KAFKA-320 People often try to embed KafkaServer in an application that ends up calling startup() and shutdown() repeatedly and sometimes in odd ways. To ensure this works correctly we have to be very careful about cleaning up resources. This is a good practice for making unit tests reliable anyway. A good first step would be to add some unit tests on startup and shutdown to cover various cases: 1. A Kafka server can startup if it is not already starting up, if it is not currently being shutdown, or if it hasn't been already started 2. A Kafka server can shutdown if it is not already shutting down, if it is not currently starting up, or if it hasn't been already shutdown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker
[ https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BalajiSeshadri updated KAFKA-1618: -- Attachment: KAFKA-1618-REBASED.patch REBASED Patch Exception thrown when running console producer with no port number for the broker - Key: KAFKA-1618 URL: https://issues.apache.org/jira/browse/KAFKA-1618 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Gwen Shapira Assignee: BalajiSeshadri Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, KAFKA-1618-REBASED.patch, KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch When running console producer with just localhost as the broker list, I get ArrayIndexOutOfBounds exception. I expect either a clearer error about arguments or for the producer to guess a default port. [root@shapira-1 bin]# ./kafka-console-producer.sh --topic rufus1 --broker-list localhost java.lang.ArrayIndexOutOfBoundsException: 1 at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102) at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97) at kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32) at kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41) at kafka.producer.Producer.init(Producer.scala:59) at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158) at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker
[ https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14147871#comment-14147871 ] BalajiSeshadri edited comment on KAFKA-1618 at 9/25/14 3:39 PM: [~nehanarkhede] Please find REBASED Patch attached. was (Author: balaji.sesha...@dish.com): REBASED Patch Exception thrown when running console producer with no port number for the broker - Key: KAFKA-1618 URL: https://issues.apache.org/jira/browse/KAFKA-1618 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Gwen Shapira Assignee: BalajiSeshadri Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, KAFKA-1618-REBASED.patch, KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch When running console producer with just localhost as the broker list, I get ArrayIndexOutOfBounds exception. I expect either a clearer error about arguments or for the producer to guess a default port. [root@shapira-1 bin]# ./kafka-console-producer.sh --topic rufus1 --broker-list localhost java.lang.ArrayIndexOutOfBoundsException: 1 at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102) at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97) at kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32) at kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41) at kafka.producer.Producer.init(Producer.scala:59) at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158) at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14147900#comment-14147900 ] saurabh agarwal commented on KAFKA-1555: Along with min.isr setting, the unclean leader election is required to set to false in order to avoid the data loss. Let's take a scenario - number of replica is 3, and one replica is falling behind. This replica will be moved out of isr. Due to unclean leader election is turned off, it will not become a leader. Now in isr, there will only be 2 replicas left. If the leader goes down, then the other replica in isr will become leader. Due to min.isr=2, the write will not be successful until the another replica will join the ISR. I don't think there will be any data loss in this scenario. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Read a specific number of messages using kafka
Hi, My requirement is to read a specific number of messages from kafka topic which contains data in json format and after reading number of messges, i need to write that in a file and then stop. How can I count number of messages read by my consumer code(either simpleconsumer or high level) ? Please help. -- Thanks, Pankaj Ojha
Re: Read a specific number of messages using kafka
Using high level consumer and assuming you already created an iterator: while (msgCount maxMessages it.hasNext()) { bytes = it.next().message(); eventList.add(bytes); } (See a complete example here: https://github.com/apache/flume/blob/trunk/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java) Gwen On Thu, Sep 25, 2014 at 9:15 AM, pankaj ojha pankajojh...@gmail.com wrote: Hi, My requirement is to read a specific number of messages from kafka topic which contains data in json format and after reading number of messges, i need to write that in a file and then stop. How can I count number of messages read by my consumer code(either simpleconsumer or high level) ? Please help. -- Thanks, Pankaj Ojha
Re: Read a specific number of messages using kafka
Thank You. I will try this out. On Thu, Sep 25, 2014 at 10:01 PM, Gwen Shapira gshap...@cloudera.com wrote: Using high level consumer and assuming you already created an iterator: while (msgCount maxMessages it.hasNext()) { bytes = it.next().message(); eventList.add(bytes); } (See a complete example here: https://github.com/apache/flume/blob/trunk/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java ) Gwen On Thu, Sep 25, 2014 at 9:15 AM, pankaj ojha pankajojh...@gmail.com wrote: Hi, My requirement is to read a specific number of messages from kafka topic which contains data in json format and after reading number of messges, i need to write that in a file and then stop. How can I count number of messages read by my consumer code(either simpleconsumer or high level) ? Please help. -- Thanks, Pankaj Ojha -- Thanks, Pankaj Ojha
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14147943#comment-14147943 ] Joel Koshy commented on KAFKA-1555: --- I'm +1 on the concept of min.isr and acks -1, 0, 1. This is a very interesting and important thread - sorry I missed most of it until I spent a couple of hours (!) yesterday chewing on these comments. Sriram, with regard to your second point - I had a similar concern and I think we talked about it, but not sure if it is the same issue though. i.e., at the point the leader responds to the producer it knows how many followers have received the messages so if only min.isr - 1 replicas have been written to then the leader would return a NotEnoughReplicas error code. I agree that subsequent data loss is possible on unclean leader elections. However, that is sort of expected. I think Joe provided a good interpretation of min.isr - i.e., it provides a balance between your tolerance for the probability of data loss for stored data and the need of availability of brokers to write to. For lower probability of loss - i.e., lower probability of unclean leader elections one would use a higher min.isr. Avoiding (or rather reducing) data loss on unclean leader elections I think is an orthogonal issue that other jiras such as KAFKA-1211 touch upon. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14147967#comment-14147967 ] Sriram Subramanian commented on KAFKA-1555: --- ack = -1 with clean leader election already prevents data loss. If min_isr=2, I would expect the data to be never lost when the leader fails. That should be the simplest guarantee the system should provide. We should not add further clauses to this or it would be impossible to define the system. If we were to say - with min_isr-2 and ack=-1 you just reduced the probability of loss but it could still get lost under unclean leader election, we will loose credibility on these settings. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: KAFKA-404 and KAFKA-1651
We only appreciate new contributions and don't mind going through a contributor on boarding process. In fact, I'd encourage you to give feedback on how we could improve your experience as a contributor :-) We are trying to improve the turnaround time on patch reviews. Hopefully assigning a Reviewer on the JIRA is helping improve that. Thanks, Neha On Wed, Sep 24, 2014 at 10:53 PM, Jonathan Creasy jonathan.cre...@turn.com wrote: Sorry for all the extra ReviewBoard and Jira email this evening. I’ve just recently gotten back to a place where I’ll be using and supporting Kafka in production and haven’t used these tools in a while, just getting reacquainted. I hope no one minds, thanks for allowing me to contribute! -Jonathan
[jira] [Commented] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker
[ https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148024#comment-14148024 ] Neha Narkhede commented on KAFKA-1618: -- [~balaji.sesha...@dish.com] I ran into issues applying the patch again. Would you mind following the Contributor workflow in our wiki - https://cwiki.apache.org/confluence/display/KAFKA/Patch+submission+and+review#Patchsubmissionandreview-Simplecontributorworkflow {noformat} nnarkhed-mn1:kafka-git-idea nnarkhed$ git apply --check KAFKA-1618-REBASED.patch error: src/main/scala/kafka/tools/ConsoleProducer.scala: No such file or directory error: src/main/scala/kafka/tools/GetOffsetShell.scala: No such file or directory error: src/main/scala/kafka/tools/ProducerPerformance.scala: No such file or directory error: src/main/scala/kafka/tools/ReplayLogProducer.scala: No such file or directory error: src/main/scala/kafka/tools/ReplicaVerificationTool.scala: No such file or directory error: src/main/scala/kafka/tools/SimpleConsumerShell.scala: No such file or directory error: src/main/scala/kafka/tools/ToolsUtils.scala: No such file or directory {noformat} Exception thrown when running console producer with no port number for the broker - Key: KAFKA-1618 URL: https://issues.apache.org/jira/browse/KAFKA-1618 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Gwen Shapira Assignee: BalajiSeshadri Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, KAFKA-1618-REBASED.patch, KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch When running console producer with just localhost as the broker list, I get ArrayIndexOutOfBounds exception. I expect either a clearer error about arguments or for the producer to guess a default port. [root@shapira-1 bin]# ./kafka-console-producer.sh --topic rufus1 --broker-list localhost java.lang.ArrayIndexOutOfBoundsException: 1 at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102) at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97) at kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32) at kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41) at kafka.producer.Producer.init(Producer.scala:59) at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158) at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26018: Patch for KAFKA-404
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26018/#review54559 --- core/src/test/scala/unit/kafka/server/CreateZKChrootTest.scala https://reviews.apache.org/r/26018/#comment94745 Thanks for adding this test. It's probably worth consolidating some of our tests on server startup and shutdown. Can we rename ServerShutdownTest to ServerStartupAndShutdownTest and move this test there? - Neha Narkhede On Sept. 25, 2014, 5:31 a.m., Jonathan Creasy wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26018/ --- (Updated Sept. 25, 2014, 5:31 a.m.) Review request for kafka. Bugs: KAFKA-404 https://issues.apache.org/jira/browse/KAFKA-404 Repository: kafka Description --- KAFKA-404 auto-create Zookeeper CHROOT on Startup Diffs - core/src/main/scala/kafka/server/KafkaServer.scala 390fef500d7e0027e698c259d777454ba5a0f5e8 core/src/test/scala/unit/kafka/server/CreateZKChrootTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/26018/diff/ Testing --- Thanks, Jonathan Creasy
[jira] [Commented] (KAFKA-1367) Broker topic metadata not kept in sync with ZooKeeper
[ https://issues.apache.org/jira/browse/KAFKA-1367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148054#comment-14148054 ] Magnus Edenhill commented on KAFKA-1367: [~guozhang] Yes, KAFKA-1555 looks like a good match. Broker topic metadata not kept in sync with ZooKeeper - Key: KAFKA-1367 URL: https://issues.apache.org/jira/browse/KAFKA-1367 Project: Kafka Issue Type: Bug Affects Versions: 0.8.0, 0.8.1 Reporter: Ryan Berdeen Labels: newbie++ Attachments: KAFKA-1367.txt When a broker is restarted, the topic metadata responses from the brokers will be incorrect (different from ZooKeeper) until a preferred replica leader election. In the metadata, it looks like leaders are correctly removed from the ISR when a broker disappears, but followers are not. Then, when a broker reappears, the ISR is never updated. I used a variation of the Vagrant setup created by Joe Stein to reproduce this with latest from the 0.8.1 branch: https://github.com/also/kafka/commit/dba36a503a5e22ea039df0f9852560b4fb1e067c -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26018: Patch for KAFKA-404
I went and looked at the ServerShutdownTest to implement your suggestion, and I don¹t think I like this particular consolidation. How about I go and change this test to ServerStartupTest, and future startup checks can go there and we leave the Shutdown checks separate? On 9/25/14, 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26018/#review54559 --- core/src/test/scala/unit/kafka/server/CreateZKChrootTest.scala https://reviews.apache.org/r/26018/#comment94745 Thanks for adding this test. It's probably worth consolidating some of our tests on server startup and shutdown. Can we rename ServerShutdownTest to ServerStartupAndShutdownTest and move this test there? - Neha Narkhede On Sept. 25, 2014, 5:31 a.m., Jonathan Creasy wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26018/ --- (Updated Sept. 25, 2014, 5:31 a.m.) Review request for kafka. Bugs: KAFKA-404 https://issues.apache.org/jira/browse/KAFKA-404 Repository: kafka Description --- KAFKA-404 auto-create Zookeeper CHROOT on Startup Diffs - core/src/main/scala/kafka/server/KafkaServer.scala 390fef500d7e0027e698c259d777454ba5a0f5e8 core/src/test/scala/unit/kafka/server/CreateZKChrootTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/26018/diff/ Testing --- Thanks, Jonathan Creasy
[jira] [Commented] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist
[ https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148057#comment-14148057 ] Jonathan Creasy commented on KAFKA-404: --- Updated reviewboard https://reviews.apache.org/r/26019/diff/ against branch trunk When using chroot path, create chroot on startup if it doesn't exist Key: KAFKA-404 URL: https://issues.apache.org/jira/browse/KAFKA-404 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux Reporter: Jonathan Creasy Labels: newbie, patch Fix For: 0.8.2 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, KAFKA-404_2014-09-25_13:11:02.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist
[ https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jonathan Creasy updated KAFKA-404: -- Attachment: KAFKA-404_2014-09-25_13:11:02.patch When using chroot path, create chroot on startup if it doesn't exist Key: KAFKA-404 URL: https://issues.apache.org/jira/browse/KAFKA-404 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux Reporter: Jonathan Creasy Labels: newbie, patch Fix For: 0.8.2 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, KAFKA-404_2014-09-25_13:11:02.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26019: Patch for KAFKA-404
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26019/ --- (Updated Sept. 25, 2014, 6:11 p.m.) Review request for kafka. Bugs: KAFKA-404 https://issues.apache.org/jira/browse/KAFKA-404 Repository: kafka Description --- KAFKA-404 auto-create Zookeeper CHROOT on Startup Diffs (updated) - core/src/main/scala/kafka/server/KafkaServer.scala 390fef500d7e0027e698c259d777454ba5a0f5e8 core/src/test/scala/unit/kafka/server/ServerStartupTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/26019/diff/ Testing --- Ran Kafka against a brand new Zookeeper, chroot exists after Kafka starts. Also: Added Unit test, fails prior to patch. $ ./gradlew -Dtest.single=CreateZKChrootTest core:test Building project 'core' with Scala version 2.10.1 :clients:compileJava UP-TO-DATE :clients:processResources UP-TO-DATE :clients:classes UP-TO-DATE :clients:jar UP-TO-DATE :core:compileJava UP-TO-DATE :core:compileScala UP-TO-DATE :core:processResources UP-TO-DATE :core:classes UP-TO-DATE :core:compileTestJava UP-TO-DATE :core:compileTestScala :core:processTestResources UP-TO-DATE :core:testClasses :core:test kafka.server.CreateZKChrootTest testBrokerCreatesZKChroot FAILED java.lang.IllegalArgumentException: Path length must be 0 at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48) at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35) at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:766) at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87) at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308) at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304) at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213) at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223) at kafka.utils.ZkUtils$.makeSurePersistentPathExists(ZkUtils.scala:211) at kafka.utils.ZkUtils$$anonfun$setupCommonPaths$1.apply(ZkUtils.scala:91) at kafka.utils.ZkUtils$$anonfun$setupCommonPaths$1.apply(ZkUtils.scala:90) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.utils.ZkUtils$.setupCommonPaths(ZkUtils.scala:90) at kafka.server.KafkaServer.initZk(KafkaServer.scala:133) at kafka.server.KafkaServer.startup(KafkaServer.scala:81) at kafka.utils.TestUtils$.createServer(TestUtils.scala:133) at kafka.server.CreateZKChrootTest.setUp(CreateZKChrootTest.scala:40) 1 test completed, 1 failed :core:test FAILED FAILURE: Build failed with an exception. * What went wrong: Execution failed for task ':core:test'. There were failing tests. See the report at: file:///Users/jcreasy/code/kafka/core/build/reports/tests/index.html * Try: Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. BUILD FAILED Total time: 27.926 secs Passes once patch applied: $ ./gradlew -Dtest.single=CreateZKChrootTest core:test Building project 'core' with Scala version 2.10.1 :clients:compileJava UP-TO-DATE :clients:processResources UP-TO-DATE :clients:classes UP-TO-DATE :clients:jar UP-TO-DATE :core:compileJava UP-TO-DATE :core:compileScala /Users/jcreasy/code/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:160: a pure expression does nothing in statement position; you may be omitting necessary parentheses ControllerStats.uncleanLeaderElectionRate ^ /Users/jcreasy/code/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:161: a pure expression does nothing in statement position; you may be omitting necessary parentheses ControllerStats.leaderElectionTimer ^ two warnings found :core:processResources UP-TO-DATE :core:classes :core:compileTestJava UP-TO-DATE Building 69% :core:compileTestScala^Cjcreasy at C02MR0K3FD58 in ~/code/kafka on KAFKA-404* Thanks, Jonathan Creasy
Can Fetch Request cause delay of Produce Request?
Hi all, I'm working on my own kafka client implementation and I noticed strange situation. Time(s) | Action 0.0MetadataRequest 0.0MetadataResponse 0.2OffsetRequest 0.2OffsetResponse 0.3FetchRequest(MaxWaitTime=20sec) 6.0ProduceRequest 31.0 FetchResponse notice 25sec gap! 31.0 ProduceResponse What is important, it seems to me that Fetch Request with long poll will block processing of all following requests for the duration of timeout, given that there is no new data. But new data *are* flowing in and Produce Request is waiting right behind the Fetch one in server side processing queue. If what I describe is correct then I see the following problems. 1. From client's point of view, if they have a listener and a sender, then sending data should produce immediate event on listener. 2. If I want clean shutdown of my application within 5sec, but my listeners are configured with 20sec timeout, than I am risking losing the data. My application shutdown time now becomes at least as long as listener's pooling time, which is unreasonable. I do understand why it was implemented that way, probably because of the specification saying The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well But to achieve proper functioning of the client, I need to allocate another tcp connection for listeners now? Also, I'm aware about new protocol proposal for the next version of Kafka, would this issue be resolved there? Thanks, Vadim. -- From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified
[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148121#comment-14148121 ] Bhavesh Mistry edited comment on KAFKA-1642 at 9/25/14 6:42 PM: HI [~jkreps], I will work on the sample program. We are not setting reconnect.backoff.ms and retry.backoff.ms configuration so it would be default configuration. Only thing I can tell you is that I have 4 Producer instances per JVM. So this might amplify issue. Thanks, Bhavesh was (Author: bmis13): HI [~jkreps], I will work on the sample program. We are not setting reconnect.backoff.ms and retry.backoff.ms configuration so it would be default configuration. Only thing I can tell you is that I have 4 Producer instance per JVM. So this might amplify issue. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Jun Rao I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost
[ https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148121#comment-14148121 ] Bhavesh Mistry commented on KAFKA-1642: --- HI [~jkreps], I will work on the sample program. We are not setting reconnect.backoff.ms and retry.backoff.ms configuration so it would be default configuration. Only thing I can tell you is that I have 4 Producer instance per JVM. So this might amplify issue. Thanks, Bhavesh [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost --- Key: KAFKA-1642 URL: https://issues.apache.org/jira/browse/KAFKA-1642 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Bhavesh Mistry Assignee: Jun Rao I see my CPU spike to 100% when network connection is lost for while. It seems network IO thread are very busy logging following error message. Is this expected behavior ? 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: java.lang.IllegalStateException: No entry found for node -2 at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110) at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394) at org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist
[ https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148140#comment-14148140 ] Neha Narkhede commented on KAFKA-404: - Your suggestion about consolidation is also fine. When using chroot path, create chroot on startup if it doesn't exist Key: KAFKA-404 URL: https://issues.apache.org/jira/browse/KAFKA-404 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux Reporter: Jonathan Creasy Labels: newbie, patch Fix For: 0.8.2 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, KAFKA-404_2014-09-25_13:11:02.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Can Fetch Request cause delay of Produce Request?
The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well But to achieve proper functioning of the client, I need to allocate another tcp connection for listeners now? Yes. The easiest way to send and receive is to use a producer client and a consumer client, each of which will have it's own TCP connection to the server. On Thu, Sep 25, 2014 at 11:40 AM, Vadim Chekan kot.bege...@gmail.com wrote: Hi all, I'm working on my own kafka client implementation and I noticed strange situation. Time(s) | Action 0.0MetadataRequest 0.0MetadataResponse 0.2OffsetRequest 0.2OffsetResponse 0.3FetchRequest(MaxWaitTime=20sec) 6.0ProduceRequest 31.0 FetchResponse notice 25sec gap! 31.0 ProduceResponse What is important, it seems to me that Fetch Request with long poll will block processing of all following requests for the duration of timeout, given that there is no new data. But new data *are* flowing in and Produce Request is waiting right behind the Fetch one in server side processing queue. If what I describe is correct then I see the following problems. 1. From client's point of view, if they have a listener and a sender, then sending data should produce immediate event on listener. 2. If I want clean shutdown of my application within 5sec, but my listeners are configured with 20sec timeout, than I am risking losing the data. My application shutdown time now becomes at least as long as listener's pooling time, which is unreasonable. I do understand why it was implemented that way, probably because of the specification saying The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well But to achieve proper functioning of the client, I need to allocate another tcp connection for listeners now? Also, I'm aware about new protocol proposal for the next version of Kafka, would this issue be resolved there? Thanks, Vadim. -- From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified
Re: Review Request 26020: Patch for KAFKA-1651
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26020/#review54579 --- Ship it! Ship It! - Neha Narkhede On Sept. 25, 2014, 5:50 a.m., Jonathan Creasy wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26020/ --- (Updated Sept. 25, 2014, 5:50 a.m.) Review request for kafka. Bugs: KAFKA-1651 https://issues.apache.org/jira/browse/KAFKA-1651 Repository: kafka Description --- Removed some extra whitespace Diffs - core/src/main/scala/kafka/server/KafkaServer.scala 390fef500d7e0027e698c259d777454ba5a0f5e8 Diff: https://reviews.apache.org/r/26020/diff/ Testing --- Thanks, Jonathan Creasy
[jira] [Commented] (KAFKA-1651) Removed some extra whitespace in KafkaServer.scala
[ https://issues.apache.org/jira/browse/KAFKA-1651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148174#comment-14148174 ] Neha Narkhede commented on KAFKA-1651: -- [~jcreasy] How did you come across these whitespaces? Curious to know if there is something we can do in terms of Intellij/Eclipse auto formatting to prevent such whitespace issues. Removed some extra whitespace in KafkaServer.scala -- Key: KAFKA-1651 URL: https://issues.apache.org/jira/browse/KAFKA-1651 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jonathan Creasy Priority: Trivial Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1651.patch, KAFKA-1651_2014-09-25_00:49:36.patch, KAFKA-1651_2014-09-25_00:50:11.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1651) Removed some extra whitespace in KafkaServer.scala
[ https://issues.apache.org/jira/browse/KAFKA-1651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1651: - Reviewer: Neha Narkhede Removed some extra whitespace in KafkaServer.scala -- Key: KAFKA-1651 URL: https://issues.apache.org/jira/browse/KAFKA-1651 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jonathan Creasy Priority: Trivial Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1651.patch, KAFKA-1651_2014-09-25_00:49:36.patch, KAFKA-1651_2014-09-25_00:50:11.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1651) Removed some extra whitespace in KafkaServer.scala
[ https://issues.apache.org/jira/browse/KAFKA-1651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148185#comment-14148185 ] Jonathan Creasy commented on KAFKA-1651: I noticed them in the RB diff when I was working on KAFKA-404, there are quite a few throughout, I noticed some on https://reviews.apache.org/r/25886/diff/?expand=1 for example also. They are bright red blocks when show extra whitespace is on. Removed some extra whitespace in KafkaServer.scala -- Key: KAFKA-1651 URL: https://issues.apache.org/jira/browse/KAFKA-1651 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jonathan Creasy Priority: Trivial Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1651.patch, KAFKA-1651_2014-09-25_00:49:36.patch, KAFKA-1651_2014-09-25_00:50:11.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist
[ https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148189#comment-14148189 ] Jonathan Creasy commented on KAFKA-404: --- RB Updated, let's merge this! :) (it's only 2 years old...) When using chroot path, create chroot on startup if it doesn't exist Key: KAFKA-404 URL: https://issues.apache.org/jira/browse/KAFKA-404 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux Reporter: Jonathan Creasy Labels: newbie, patch Fix For: 0.8.2 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, KAFKA-404_2014-09-25_13:11:02.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26019: Patch for KAFKA-404
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26019/#review54581 --- Ship it! Ship It! - Neha Narkhede On Sept. 25, 2014, 6:11 p.m., Jonathan Creasy wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26019/ --- (Updated Sept. 25, 2014, 6:11 p.m.) Review request for kafka. Bugs: KAFKA-404 https://issues.apache.org/jira/browse/KAFKA-404 Repository: kafka Description --- KAFKA-404 auto-create Zookeeper CHROOT on Startup Diffs - core/src/main/scala/kafka/server/KafkaServer.scala 390fef500d7e0027e698c259d777454ba5a0f5e8 core/src/test/scala/unit/kafka/server/ServerStartupTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/26019/diff/ Testing --- Ran Kafka against a brand new Zookeeper, chroot exists after Kafka starts. Also: Added Unit test, fails prior to patch. $ ./gradlew -Dtest.single=CreateZKChrootTest core:test Building project 'core' with Scala version 2.10.1 :clients:compileJava UP-TO-DATE :clients:processResources UP-TO-DATE :clients:classes UP-TO-DATE :clients:jar UP-TO-DATE :core:compileJava UP-TO-DATE :core:compileScala UP-TO-DATE :core:processResources UP-TO-DATE :core:classes UP-TO-DATE :core:compileTestJava UP-TO-DATE :core:compileTestScala :core:processTestResources UP-TO-DATE :core:testClasses :core:test kafka.server.CreateZKChrootTest testBrokerCreatesZKChroot FAILED java.lang.IllegalArgumentException: Path length must be 0 at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48) at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35) at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:766) at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87) at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308) at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304) at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213) at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223) at kafka.utils.ZkUtils$.makeSurePersistentPathExists(ZkUtils.scala:211) at kafka.utils.ZkUtils$$anonfun$setupCommonPaths$1.apply(ZkUtils.scala:91) at kafka.utils.ZkUtils$$anonfun$setupCommonPaths$1.apply(ZkUtils.scala:90) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.utils.ZkUtils$.setupCommonPaths(ZkUtils.scala:90) at kafka.server.KafkaServer.initZk(KafkaServer.scala:133) at kafka.server.KafkaServer.startup(KafkaServer.scala:81) at kafka.utils.TestUtils$.createServer(TestUtils.scala:133) at kafka.server.CreateZKChrootTest.setUp(CreateZKChrootTest.scala:40) 1 test completed, 1 failed :core:test FAILED FAILURE: Build failed with an exception. * What went wrong: Execution failed for task ':core:test'. There were failing tests. See the report at: file:///Users/jcreasy/code/kafka/core/build/reports/tests/index.html * Try: Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. BUILD FAILED Total time: 27.926 secs Passes once patch applied: $ ./gradlew -Dtest.single=CreateZKChrootTest core:test Building project 'core' with Scala version 2.10.1 :clients:compileJava UP-TO-DATE :clients:processResources UP-TO-DATE :clients:classes UP-TO-DATE :clients:jar UP-TO-DATE :core:compileJava UP-TO-DATE :core:compileScala /Users/jcreasy/code/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:160: a pure expression does nothing in statement position; you may be omitting necessary parentheses ControllerStats.uncleanLeaderElectionRate ^ /Users/jcreasy/code/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:161: a pure expression does nothing in statement position; you may be omitting necessary parentheses ControllerStats.leaderElectionTimer ^ two warnings found :core:processResources UP-TO-DATE :core:classes :core:compileTestJava UP-TO-DATE Building 69% :core:compileTestScala^Cjcreasy at C02MR0K3FD58 in ~/code/kafka on KAFKA-404* Thanks, Jonathan Creasy
[jira] [Updated] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1555: Attachment: KAFKA-1555.5.patch Noticed that prev patch accidentally included my slightly modified build.gradle. Removed that. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/ --- (Updated Sept. 25, 2014, 7:41 p.m.) Review request for kafka. Changes --- Removed accidental build.gradle Repository: kafka Description --- KAFKA-1555: provide strong consistency with reasonable availability Diffs (updated) - clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 core/src/main/scala/kafka/cluster/Partition.scala ff106b4 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 core/src/main/scala/kafka/common/NotEnoughReplicasException.scala PRE-CREATION core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 39f777b core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c Diff: https://reviews.apache.org/r/25886/diff/ Testing --- With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, with 1,3 and 4 min.insync.replicas. * min.insync.replicas=1 behaved normally (all writes succeeded as long as a broker was up) * min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and one broker was down * min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1 See notes about retry behavior in the JIRA. Thanks, Gwen Shapira
[jira] [Commented] (KAFKA-1419) cross build for scala 2.11
[ https://issues.apache.org/jira/browse/KAFKA-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148228#comment-14148228 ] Jonathan Herriott commented on KAFKA-1419: -- Hi, I need to integrate the kafka 0.8.1 java libraries built for scala 2.11.1 into a project. It appears that neither the 0.8 nor the 0.8.1 branch have these patches applied. Which patches above are necessary in order to get this working? cross build for scala 2.11 -- Key: KAFKA-1419 URL: https://issues.apache.org/jira/browse/KAFKA-1419 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.1 Reporter: Scott Clasen Assignee: Ivan Lyutov Priority: Blocker Fix For: 0.8.2, 0.8.1.2 Attachments: KAFKA-1419-scalaBinaryVersion.patch, KAFKA-1419-scalaBinaryVersion.patch, KAFKA-1419.patch, KAFKA-1419.patch, KAFKA-1419_2014-07-28_15:05:16.patch, KAFKA-1419_2014-07-29_15:13:43.patch, KAFKA-1419_2014-08-04_14:43:26.patch, KAFKA-1419_2014-08-05_12:51:16.patch, KAFKA-1419_2014-08-07_10:17:34.patch, KAFKA-1419_2014-08-07_10:52:18.patch, KAFKA-1419_cross_build_for_scala_2_11_for_0_8_1_branch.patch Please publish builds for scala 2.11, hopefully just needs a small tweak to the gradle conf? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1652) NoSuchElementException on server shutdown
Neha Narkhede created KAFKA-1652: Summary: NoSuchElementException on server shutdown Key: KAFKA-1652 URL: https://issues.apache.org/jira/browse/KAFKA-1652 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Observed the following exception on server shutdown. Happens pretty frequently though not on every shutdown. Ways to reproduce - 1. Checkout kafka from trunk 2. Run bin/kafka-server-start.sh config/server.properties 3. Ctrl C {noformat} 2014-09-25 12:31:14,441] ERROR None.get (kafka.network.Processor) java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:522) at kafka.network.AbstractServerThread.close(SocketServer.scala:165) at kafka.network.AbstractServerThread.close(SocketServer.scala:157) at kafka.network.Processor.close(SocketServer.scala:372) at kafka.network.AbstractServerThread.closeAll(SocketServer.scala:178) at kafka.network.Processor$$anonfun$run$3.apply$mcV$sp(SocketServer.scala:362) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.network.AbstractServerThread.swallowError(SocketServer.scala:108) at kafka.network.Processor.run(SocketServer.scala:362) at java.lang.Thread.run(Thread.java:695) {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist
[ https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-404: Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patches, pushed to trunk When using chroot path, create chroot on startup if it doesn't exist Key: KAFKA-404 URL: https://issues.apache.org/jira/browse/KAFKA-404 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux Reporter: Jonathan Creasy Assignee: Jonathan Creasy Labels: newbie, patch Fix For: 0.8.2 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, KAFKA-404_2014-09-25_13:11:02.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist
[ https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede closed KAFKA-404. --- When using chroot path, create chroot on startup if it doesn't exist Key: KAFKA-404 URL: https://issues.apache.org/jira/browse/KAFKA-404 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux Reporter: Jonathan Creasy Assignee: Jonathan Creasy Labels: newbie, patch Fix For: 0.8.2 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, KAFKA-404_2014-09-25_13:11:02.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist
[ https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-404: Assignee: Jonathan Creasy When using chroot path, create chroot on startup if it doesn't exist Key: KAFKA-404 URL: https://issues.apache.org/jira/browse/KAFKA-404 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux Reporter: Jonathan Creasy Assignee: Jonathan Creasy Labels: newbie, patch Fix For: 0.8.2 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, KAFKA-404_2014-09-25_13:11:02.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1652) NoSuchElementException on server shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148263#comment-14148263 ] Joel Koshy commented on KAFKA-1652: --- Are you sure you have the fix in KAFKA-1577 ? If so should we just reopen that? NoSuchElementException on server shutdown - Key: KAFKA-1652 URL: https://issues.apache.org/jira/browse/KAFKA-1652 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Labels: newbie++ Observed the following exception on server shutdown. Happens pretty frequently though not on every shutdown. Ways to reproduce - 1. Checkout kafka from trunk 2. Run bin/kafka-server-start.sh config/server.properties 3. Ctrl C {noformat} 2014-09-25 12:31:14,441] ERROR None.get (kafka.network.Processor) java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:522) at kafka.network.AbstractServerThread.close(SocketServer.scala:165) at kafka.network.AbstractServerThread.close(SocketServer.scala:157) at kafka.network.Processor.close(SocketServer.scala:372) at kafka.network.AbstractServerThread.closeAll(SocketServer.scala:178) at kafka.network.Processor$$anonfun$run$3.apply$mcV$sp(SocketServer.scala:362) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.network.AbstractServerThread.swallowError(SocketServer.scala:108) at kafka.network.Processor.run(SocketServer.scala:362) at java.lang.Thread.run(Thread.java:695) {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1653) Duplicate broker ids allowed in replica assignment
Ryan Berdeen created KAFKA-1653: --- Summary: Duplicate broker ids allowed in replica assignment Key: KAFKA-1653 URL: https://issues.apache.org/jira/browse/KAFKA-1653 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen The reassign partitions command and the controller do not ensure that all replicas for a partition are on different brokers. For example, you could set 1,2,2 as the list of brokers for the replicas. kafka-topics.sh --describe --under-replicated will list these partitions as under-replicated, but I can't see a reason why the controller should allow this state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts
[ https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1647: - Priority: Critical (was: Major) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts Key: KAFKA-1647 URL: https://issues.apache.org/jira/browse/KAFKA-1647 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Priority: Critical We ran into this scenario recently in a production environment. This can happen when enough brokers in a cluster are taken down. i.e., a rolling bounce done properly should not cause this issue. It can occur if all replicas for any partition are taken down. Here is a sample scenario: * Cluster of three brokers: b0, b1, b2 * Two partitions (of some topic) with replication factor two: p0, p1 * Initial state: ** p0: leader = b0, ISR = {b0, b1} ** p1: leader = b1, ISR = {b0, b1} * Do a parallel hard-kill of all brokers * Bring up b2, so it is the new controller * b2 initializes its controller context and populates its leader/ISR cache (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last known leaders are b0 (for p0) and b1 (for p2) * Bring up b1 * The controller's onBrokerStartup procedure initiates a replica state change for all replicas on b1 to become online. As part of this replica state change it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not included in the leaders field because b0 is down. * On receiving the LeaderAndIsrRequest, b1's replica manager will successfully make b2 the leader for p1 (and create the local replica object corresponding to p1). It will however abort the become follower transition for p0 because the designated leader b2 is offline. So it will not create the local replica object for p0. * It will then start the high water mark checkpoint thread. Since only p1 has a local replica object, only p1's high water mark will be checkpointed to disk. p0's previously written checkpoint if any will be lost. So in summary it seems we should always create the local replica object even if the online transition does not happen. Possible symptoms of the above bug could be one or more of the following (we saw 2 and 3): # Data loss; yes on a hard-kill data loss is expected, but this can actually cause loss of nearly all data if the broker becomes follower, truncates, and soon after happens to become leader. # High IO on brokers that lose their high water mark then subsequently (on a successful become follower transition) truncate their log to zero and start catching up from the beginning. # If the offsets topic is affected, then offsets can get reset. This is because during an offset load we don't read past the high water mark. So if a water mark is missing then we don't load anything (even if the offsets are there in the log). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1631) ReplicationFactor and under-replicated partitions incorrect during reassignment
[ https://issues.apache.org/jira/browse/KAFKA-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148304#comment-14148304 ] Ryan Berdeen commented on KAFKA-1631: - Not reporting partitions being reassigned seems even worse--this would lead to false negatives! It also doesn't address the fact that replication factor is reported incorrectly. It seems like the right solution would be to store the intended replication factor for the topic, and alert if the size of the ISR is less than this. ReplicationFactor and under-replicated partitions incorrect during reassignment --- Key: KAFKA-1631 URL: https://issues.apache.org/jira/browse/KAFKA-1631 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie We have a topic with a replication factor of 3. We monitor UnderReplicatedPartitions as recommended by the documentation. During a partition reassignment, partitions being reassigned are reported as under-replicated. Running a describe shows: {code} Topic:activity-wal-1PartitionCount:15 ReplicationFactor:5 Configs: Topic: activity-wal-1 Partition: 0Leader: 14 Replicas: 14,13,12,11,15Isr: 14,12,11,13 Topic: activity-wal-1 Partition: 1Leader: 14 Replicas: 15,14,11 Isr: 14,11 Topic: activity-wal-1 Partition: 2Leader: 11 Replicas: 11,15,12 Isr: 12,11,15 ... {code} It looks like the displayed replication factor, 5, is simply the number of replicas listed for the first partition, which includes both brokers in the current list and those onto which the partition is being reassigned. Partition 0 is also included in the list when using the `--under-replicated-partitions` option, even though it is replicated to more partitions than the true replication factor. During a reassignment, the under-replicated partitions metric is not usable, meaning that actual under-replicated partitions can go unnoticed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts
[ https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148336#comment-14148336 ] Joel Koshy commented on KAFKA-1647: --- If b1 aborts the follower transition the high watermark checkpoint can be removed. It does not truncate all the data at that point. However, on a subsequent become-follower transition (if any and if successful) it will truncate the log (since on unknown HW we take HW as zero). This shouldn't be too difficult to fix though. Replication offset checkpoints (high water marks) can be lost on hard kills and restarts Key: KAFKA-1647 URL: https://issues.apache.org/jira/browse/KAFKA-1647 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Priority: Critical We ran into this scenario recently in a production environment. This can happen when enough brokers in a cluster are taken down. i.e., a rolling bounce done properly should not cause this issue. It can occur if all replicas for any partition are taken down. Here is a sample scenario: * Cluster of three brokers: b0, b1, b2 * Two partitions (of some topic) with replication factor two: p0, p1 * Initial state: ** p0: leader = b0, ISR = {b0, b1} ** p1: leader = b1, ISR = {b0, b1} * Do a parallel hard-kill of all brokers * Bring up b2, so it is the new controller * b2 initializes its controller context and populates its leader/ISR cache (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last known leaders are b0 (for p0) and b1 (for p2) * Bring up b1 * The controller's onBrokerStartup procedure initiates a replica state change for all replicas on b1 to become online. As part of this replica state change it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not included in the leaders field because b0 is down. * On receiving the LeaderAndIsrRequest, b1's replica manager will successfully make b2 the leader for p1 (and create the local replica object corresponding to p1). It will however abort the become follower transition for p0 because the designated leader b2 is offline. So it will not create the local replica object for p0. * It will then start the high water mark checkpoint thread. Since only p1 has a local replica object, only p1's high water mark will be checkpointed to disk. p0's previously written checkpoint if any will be lost. So in summary it seems we should always create the local replica object even if the online transition does not happen. Possible symptoms of the above bug could be one or more of the following (we saw 2 and 3): # Data loss; yes on a hard-kill data loss is expected, but this can actually cause loss of nearly all data if the broker becomes follower, truncates, and soon after happens to become leader. # High IO on brokers that lose their high water mark then subsequently (on a successful become follower transition) truncate their log to zero and start catching up from the beginning. # If the offsets topic is affected, then offsets can get reset. This is because during an offset load we don't read past the high water mark. So if a water mark is missing then we don't load anything (even if the offsets are there in the log). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts
[ https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-1647: -- Labels: newbie++ (was: ) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts Key: KAFKA-1647 URL: https://issues.apache.org/jira/browse/KAFKA-1647 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Priority: Critical Labels: newbie++ We ran into this scenario recently in a production environment. This can happen when enough brokers in a cluster are taken down. i.e., a rolling bounce done properly should not cause this issue. It can occur if all replicas for any partition are taken down. Here is a sample scenario: * Cluster of three brokers: b0, b1, b2 * Two partitions (of some topic) with replication factor two: p0, p1 * Initial state: ** p0: leader = b0, ISR = {b0, b1} ** p1: leader = b1, ISR = {b0, b1} * Do a parallel hard-kill of all brokers * Bring up b2, so it is the new controller * b2 initializes its controller context and populates its leader/ISR cache (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last known leaders are b0 (for p0) and b1 (for p2) * Bring up b1 * The controller's onBrokerStartup procedure initiates a replica state change for all replicas on b1 to become online. As part of this replica state change it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not included in the leaders field because b0 is down. * On receiving the LeaderAndIsrRequest, b1's replica manager will successfully make b2 the leader for p1 (and create the local replica object corresponding to p1). It will however abort the become follower transition for p0 because the designated leader b2 is offline. So it will not create the local replica object for p0. * It will then start the high water mark checkpoint thread. Since only p1 has a local replica object, only p1's high water mark will be checkpointed to disk. p0's previously written checkpoint if any will be lost. So in summary it seems we should always create the local replica object even if the online transition does not happen. Possible symptoms of the above bug could be one or more of the following (we saw 2 and 3): # Data loss; yes on a hard-kill data loss is expected, but this can actually cause loss of nearly all data if the broker becomes follower, truncates, and soon after happens to become leader. # High IO on brokers that lose their high water mark then subsequently (on a successful become follower transition) truncate their log to zero and start catching up from the beginning. # If the offsets topic is affected, then offsets can get reset. This is because during an offset load we don't read past the high water mark. So if a water mark is missing then we don't load anything (even if the offsets are there in the log). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148371#comment-14148371 ] Jay Kreps commented on KAFKA-1555: -- This is a good discussion, I am glad we are taking the time to think this through carefully. Let's aim for the right end state rather than optimizing for what is easiest to implement now (since these features never get removed and we end up spending a lot of time explaining them). [~sriramsub] it sounds like you are not totally sold on min.isr. Let me try to summarize a few things people have said that I think are true and see if people can agree on them: 1. Unclean leader election is an orthogonal issue. Regardless of settings, choosing a leader that is not caught up means losing data. This option covers catastrophic recovery (i.e. no server with complete data exists). We can give finer control over whether unclean election is manual or automatic but I think you need to have this escape hatch for the case where the authoritative copy of the data is destroyed. 2. Specifying a min.isr does actually make sense. I think people have one of two cases in mind. In one case non-availability means data loss. This is likely the most common case. In this case even if you are down to your last replica you still want to perform the write because there is still some hope the data will not be lost and if you refuse the write the chance of loss is 100%. In another case non-availability can be tolerated because something upstream (perhaps the client or another system) can hold onto the data and retry later. In this case you want to be sure that when you accept a write it is safe. In this case refusing a write is okay but accepting a write and then losing it is much worse. It's true that it is very hard to reason about the right min.isr as that depends on the probability of failure over time. But this criticism is also true of replication factor (e.g. to know an appropriate replication factor to yield a particular probability of data loss you need to know the joint probability distribution over machine failures). 3. With regard to min.isr there are three issues: (1) what are the settings that actually make sense, (2) what is the best way to express these in the protocol, and (3) what is the best way to represent this in the client configuration. I think we need to start by agreeing on (1). 4. I believe people are actually in agreement that the following settings make sense: a. acks = 0, min.isr=0 b. acks = 1, min.isr = 1 c. acks = -1, min.isr in {1, ..., N} Conversely no other settings make sense. Does everyone agree on this? If so the question is really how to expose this to the user. 4. There were several proposals for how to express these options. a. The current proposal is to have acks remain in the protocol with its original meaning and values but add a topic configuration controlling min.isr. I personally think this is a bit weird since both about the definition of success for the request so it makes sense to send them with the request. b. Alternately we could add a new field in the produce request specifying the min.isr. c. Alternately we could add a new field in the response returning the actual isr size. An advantage of this is that it allows the client to distinguish between write failed and write succeeded but not with enough replicas. d. Finally we could overload acks = -2, -3, etc to mean acks = -1 and min.isr = 2, 3, etc. This sounds insane, but maybe it actually isn't. Since not all combinations of acks and min.isr make sense, this does actually enumerate the sensible cases. 5. Regardless of what we do in (4) the configuration can be somewhat simpler. Probably just by having the user specify min.isr and erroring out if they combine min.isr and acks in a non-sensical way. 6. It isn't clear to me whether the right behavior is to fail fast when the min.isr likely won't be met (because the current isr min) and not attempt the write at all or else to always do the write and then return an error if the min.isr isn't met. The later case means that retries, which are the sane thing to do when you get the error, will lead to potentially many duplicates. In fact in the common case where you want to kind of block and keep trying the write until durability can be guaranteed even if that takes a few minutes this might means thousands of duplicates. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch,
Re: [jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
Jay, thank you for the excellence. Regarding your point 6, I would just like to mention that with a partial function style interface facet for event listener registrations on top of FarRefs and RemotePromiseRefs, in ERights style distributed promises, it is easy to think about multiple registrations for different future scenarios, scheduled or failure. Even if undetected partition occurs, replays can be revoked after a previous attempt succeeds or other client defined strategies. I do not know if this is considered for 0.9. Thank you, - Rob On Sep 25, 2014, at 3:53 PM, Jay Kreps (JIRA) j...@apache.org wrote: [ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148371#comment-14148371 ] Jay Kreps commented on KAFKA-1555: -- This is a good discussion, I am glad we are taking the time to think this through carefully. Let's aim for the right end state rather than optimizing for what is easiest to implement now (since these features never get removed and we end up spending a lot of time explaining them). [~sriramsub] it sounds like you are not totally sold on min.isr. Let me try to summarize a few things people have said that I think are true and see if people can agree on them: 1. Unclean leader election is an orthogonal issue. Regardless of settings, choosing a leader that is not caught up means losing data. This option covers catastrophic recovery (i.e. no server with complete data exists). We can give finer control over whether unclean election is manual or automatic but I think you need to have this escape hatch for the case where the authoritative copy of the data is destroyed. 2. Specifying a min.isr does actually make sense. I think people have one of two cases in mind. In one case non-availability means data loss. This is likely the most common case. In this case even if you are down to your last replica you still want to perform the write because there is still some hope the data will not be lost and if you refuse the write the chance of loss is 100%. In another case non-availability can be tolerated because something upstream (perhaps the client or another system) can hold onto the data and retry later. In this case you want to be sure that when you accept a write it is safe. In this case refusing a write is okay but accepting a write and then losing it is much worse. It's true that it is very hard to reason about the right min.isr as that depends on the probability of failure over time. But this criticism is also true of replication factor (e.g. to know an appropriate replication factor to yield a particular probability of data loss you need to know the joint probability distribution over machine failures). 3. With regard to min.isr there are three issues: (1) what are the settings that actually make sense, (2) what is the best way to express these in the protocol, and (3) what is the best way to represent this in the client configuration. I think we need to start by agreeing on (1). 4. I believe people are actually in agreement that the following settings make sense: a. acks = 0, min.isr=0 b. acks = 1, min.isr = 1 c. acks = -1, min.isr in {1, ..., N} Conversely no other settings make sense. Does everyone agree on this? If so the question is really how to expose this to the user. 4. There were several proposals for how to express these options. a. The current proposal is to have acks remain in the protocol with its original meaning and values but add a topic configuration controlling min.isr. I personally think this is a bit weird since both about the definition of success for the request so it makes sense to send them with the request. b. Alternately we could add a new field in the produce request specifying the min.isr. c. Alternately we could add a new field in the response returning the actual isr size. An advantage of this is that it allows the client to distinguish between write failed and write succeeded but not with enough replicas. d. Finally we could overload acks = -2, -3, etc to mean acks = -1 and min.isr = 2, 3, etc. This sounds insane, but maybe it actually isn't. Since not all combinations of acks and min.isr make sense, this does actually enumerate the sensible cases. 5. Regardless of what we do in (4) the configuration can be somewhat simpler. Probably just by having the user specify min.isr and erroring out if they combine min.isr and acks in a non-sensical way. 6. It isn't clear to me whether the right behavior is to fail fast when the min.isr likely won't be met (because the current isr min) and not attempt the write at all or else to always do the write and then return an error if the min.isr isn't met. The later case means that retries, which are the sane thing to do when you get the error, will lead to
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148407#comment-14148407 ] Sriram Subramanian commented on KAFKA-1555: --- Thank you for summarizing all the thoughts Jay. 1. I had issues with how ack was designed initially with the min_isr config and it looks a lot better now with ack = 0, ack = 1 and ack = -1. I still think ack should be an enum explaining what it does rather than using -1 or any arbitrary integers. 2. I don't see the value of min_isr if it does not prevent data loss under unclean leader election. If it was a clean leader election, we would always have one other replica that has the data and min_isr does not add any more value. It is completely possible to ensure there is no data loss with unclean leader election using the min_isr and I think that is the real benefit of it. 3. Has I had said previously, I like the sender to know what guarantees they get when they send the request and would opt for min_isr being exposed at the API level. 4. W.r.t your last point, I think it may not be possible to avoid duplicates by failing before writing to the log. The reason is that the isr could become less than min_isr just after the check and we could still end up failing the request after a timeout. Agreed, this is an edge case and we end up with a lot less duplicates. So I think, you would need the check in both places. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 25944: Patch for KAFKA-1013
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25944/#review54609 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94796 This comment line is for code line 320, better move it above it. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94797 This comment line is for code line 320, better move it above it. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94799 Is there a difference between these two: Thread.sleep() TimeUnit.MILLISECONDS.sleep() since we do the former everywhere in the code base. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94800 Do we still need this variable? core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94802 Can we omit collection.Seq() here can just use groupId = config.groupId? core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94804 merge in a single line core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94806 revert core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94807 revert core/src/main/scala/kafka/tools/ExportOffsets.scala https://reviews.apache.org/r/25944/#comment94812 Can we actually make the same format for ZK / offsetmanager, making two different formats would make it harder to be parsed since the user needs to know whether ZK or offsetmanager is used. core/src/main/scala/kafka/tools/ExportOffsets.scala https://reviews.apache.org/r/25944/#comment94810 We can make a parseBrokerList in Utils and use it there, since I have seens this similar parsing logic at multiple places. core/src/main/scala/kafka/tools/ExportOffsets.scala https://reviews.apache.org/r/25944/#comment94811 You can take a look at KAFKA-686's latest patch which did some cleanup on the util functions; these function may probably merged into Utils. core/src/main/scala/kafka/tools/ImportOffsets.scala https://reviews.apache.org/r/25944/#comment94813 Ditto as above, can we unify the input offset format? core/src/main/scala/kafka/tools/ImportOffsets.scala https://reviews.apache.org/r/25944/#comment94814 Ditto as above. core/src/main/scala/kafka/tools/ImportOffsets.scala https://reviews.apache.org/r/25944/#comment94815 Same as Joel suggested: we can just use config's default values. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94816 The apache header is missing. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94819 This could be error. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94822 This can be trace, or we can just print the offset manager id in debug if it does not contain error code. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94820 Could be error(Error while connecting to %s:%d for fetching consumer metadata), since thi is not a general exception. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94821 When an exception is thrown and caught here, we should skip the rest of the loop. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94823 Could be error(Error while connecting to offset manager %s) core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94824 Some logging inconsistency: broker [%s:%d] broker %s:%d %s:%d core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94825 Why this API needs to return an Option while the previous one can directly return the reponse? core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94826 We do not need Possible cause any more, just print the exception's message if necessary. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94827 Are there any other exceptions that can be caught besides IOExceptions? We need to be careful about always catching Throwable and printing stack trace. - Guozhang Wang On Sept. 23, 2014, 5:48 p.m., Mayuresh Gharat wrote: --- This is an
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148450#comment-14148450 ] Gwen Shapira commented on KAFKA-1555: - Call me a control freak, but don't lose data is a critical requirement, and I'd like to control as much as possible of it in a central location that can be monitored and audited. I can't see how we can force everyone who produces data to do the right thing. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148483#comment-14148483 ] Jay Kreps commented on KAFKA-1555: -- 1. Yeah this may have been a mistake. I think integers are fine for the protocol. We need not expose them in this way in the client config. 2. Yeah I think what I said wasn't very clear. You are correct that the loss situation is always an unclean election. I think your complaint is that min.isr is hard to reason about. I actually agree. What I was trying to say was that you can't totally prevent data loss either by replication or min.isr because regardless of the number of writes there is some chance of more failures than that. So fundamentally using the number of writes/replicas is a way to increase the probability of durability. Both are hard to reason about but I don't know if min.isr is worse than replication factor in that respect. 3. Which of the four options are you saying you like? 4. Yes, I totally agree. Let me elaborate. I claim there are really only two common cases here (a) you have the ability to block and wait for sufficient available replicas holding onto whatever unwritten data in the meantime, (b) you don't. I think probably the majority of uses are (b), for example any data produced by a web application would be this way. But there are plenty of cases where you can block (a stream processing job reading from an upstream topic, or when replicating data coming out of a database, etc). min.isr only helps the case where you can block. So the only sane way to use min.isr is to also set retries to infinity and keep trying until you are sure the write has succeeded. But if we don't fail fast on the write imagine how this would work in practice. A server fails bringing you below your min.isr setting and for the hour while someone is fixing it your process is sitting there pumping out duplicate writes. This is likely not what anyone would want. But as you say you can't guarantee the data wasn't written because the isr could shrink after the write occurred. This would be rare, but possible. However attempting to fail fast, even though it isn't perfect, fixes the problem issue--it is possible there may be a duplicate but that is true anyway just due to network timeouts, but there won't be a billion duplicates which is what I consider to be the blocker issue. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Can Fetch Request cause delay of Produce Request?
Thank you Neha, Allocating dedicated connection for fetching client worked fine and did not require any changes to kafka client library code. Thinking about it, server perhaps could process Fetch and Produce requests in parallel, while still returning result in order as in the spec. What needs to be prevented is multiple producers or multiple fetchers simultaneous processing. I hope I'll have time to play with server a little bit. Vadim. On Thu, Sep 25, 2014 at 12:13 PM, Neha Narkhede neha.narkh...@gmail.com wrote: The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well But to achieve proper functioning of the client, I need to allocate another tcp connection for listeners now? Yes. The easiest way to send and receive is to use a producer client and a consumer client, each of which will have it's own TCP connection to the server. On Thu, Sep 25, 2014 at 11:40 AM, Vadim Chekan kot.bege...@gmail.com wrote: Hi all, I'm working on my own kafka client implementation and I noticed strange situation. Time(s) | Action 0.0MetadataRequest 0.0MetadataResponse 0.2OffsetRequest 0.2OffsetResponse 0.3FetchRequest(MaxWaitTime=20sec) 6.0ProduceRequest 31.0 FetchResponse notice 25sec gap! 31.0 ProduceResponse What is important, it seems to me that Fetch Request with long poll will block processing of all following requests for the duration of timeout, given that there is no new data. But new data *are* flowing in and Produce Request is waiting right behind the Fetch one in server side processing queue. If what I describe is correct then I see the following problems. 1. From client's point of view, if they have a listener and a sender, then sending data should produce immediate event on listener. 2. If I want clean shutdown of my application within 5sec, but my listeners are configured with 20sec timeout, than I am risking losing the data. My application shutdown time now becomes at least as long as listener's pooling time, which is unreasonable. I do understand why it was implemented that way, probably because of the specification saying The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well But to achieve proper functioning of the client, I need to allocate another tcp connection for listeners now? Also, I'm aware about new protocol proposal for the next version of Kafka, would this issue be resolved there? Thanks, Vadim. -- From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified -- From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review54620 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment94830 Do we need to do this check every time in the loop? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94831 no need empty line here. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94832 No need bracket core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94833 No need bracket core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94835 Maximum bytes that can be buffered in the data channels core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94834 in terms of bytes core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94836 Inconsistency indentation. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94838 Capitalize: Offset commit interval in ms core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94841 Do you need to turn off auto commit on the consumer threads here? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94840 We can add some more comment here, explaning: 1) why we add the offset commit thread for new producer, but not old producer; 2) what risks does the old producer have (for not having offset commit thread). core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94842 For clean shutdown, you need to 1) halt consumer threads first. 2) wait for producer to drain all the messages in data channel. 3) manually commit offsets on consumer threads. 4) shut down consumer threads. Otherwise we will have data duplicates as we commit offsets based on min. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94844 queueId core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94846 How about having a histogram for each queue instead of getting the sum? The update call would be a bit less expensive and we can monitor if some queues are empty while others get all the data. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94847 Ditto above. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94849 Add comments explaining why we force an unclean shutdown with System.exit here. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94857 Unfortunately this may not be the case, as we can have multiple connectors which are using different consumer configs with different group ids. We need to either 1) change the config settings to enforce this to be true, or 2) use a separate offset client that remembers which topics belongs to which groups. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94858 Capitalize first word core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94859 Capitalize first word core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94863 Adding comment to the logic of how this works. Also a few questions: 1) is the map() call synchronized with other threads putting new offsets into the map? 2) after the sorting, the logic may be clearer as val commitableOffsetIndex = 0 while (offsets[commitableOffsetIndex] - offsets.head == commitableOffsetIndex) commitableOffsetIndex += 1 offsetToCommit = offsets[commitableOffsetIndex] + 1 core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94855 The send().get() call is missing. core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala https://reviews.apache.org/r/25995/#comment94853 Apache header missing. - Guozhang Wang On Sept. 24, 2014, 4:26 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Sept. 24, 2014, 4:26 p.m.) Review request for kafka. Bugs: KAFKA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 Repository: kafka Description
[jira] [Updated] (KAFKA-589) Clean shutdown after startup connection failure
[ https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-589: Status: Patch Available (was: Open) This patch makes KafkaServer clean up after itself, but still rethrow any caught exceptions. This keeps the existing interface the same, should still work if the caller does cleanup themselves by catching exceptions and calling shutdown, but also cleans up if they don't so the leftover thread won't cause a hang. Also adds a test of this behavior. Clean shutdown after startup connection failure --- Key: KAFKA-589 URL: https://issues.apache.org/jira/browse/KAFKA-589 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.0, 0.7.2 Reporter: Jason Rosenberg Assignee: Swapnil Ghike Priority: Minor Labels: bugs, newbie Hi, I'm embedding the kafka server (0.7.2) in an application container. I've noticed that if I try to start the server without zookeeper being available, by default it gets a zk connection timeout after 6 seconds, and then throws an Exception out of KafkaServer.startup()E.g., I see this stack trace: Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000 at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84) at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44) at kafka.log.LogManager.init(LogManager.scala:93) at kafka.server.KafkaServer.startup(KafkaServer.scala:58) So that's ok, I can catch the exception, and then shut everything down gracefully, in this case. However, when I do this, it seems there is a daemon thread still around, which doesn't quit, and so the server never actually exits the jvm. Specifically, this thread seems to hang around: kafka-logcleaner-0 prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on condition [112c07000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 7f40d4be8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) at java.lang.Thread.run(Thread.java:680) Looking at the code in kafka.log.LogManager(), it does seem like it starts up the scheduler to clean logs, before then trying to connect to zk (and in this case fail): /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { info(starting log cleaner every + logCleanupIntervalMs + ms) scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) } So this scheduler does not appear to be stopped if startup fails. However, if I catch the above RuntimeException, and then call KafkaServer.shutdown(), then it will stop the scheduler, and all is good. However, it seems odd that if I get an exception when calling KafkaServer.startup(), that I should still have to do a KafkaServer.shutdown(). Rather, wouldn't it be better to have it internally cleanup after itself if startup() gets an exception? I'm not sure I can reliably call shutdown() after a failed startup() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148510#comment-14148510 ] Jun Rao commented on KAFKA-1555: a. I agree that unclean leader election is an orthogonal issue from min.isr. Unclean leader election only happens when all replicas in isr fail. This can happen independent of the value of min.isr. The higher the value of min.isr, the lower the probability that an unclean leader election will happen. b. The issue with exposing min.isr in the produce config is that a producer can be used to send multiple topics. Since different topics can have different replication factor, it maybe difficult to configure a proper min.isr that works for all topics. We had the same issue with ack 1 before. However, that option will be removed in Gwen's patch. From this perspective, I think having min.isr as a topic level config makes sense. c. I agree that it's better to add a separate check to prevent the message into the leader's log if the isr at that moment is less than min.isr. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-589) Clean shutdown after startup connection failure
[ https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-589: Attachment: KAFKA-589-v1.patch Clean shutdown after startup connection failure --- Key: KAFKA-589 URL: https://issues.apache.org/jira/browse/KAFKA-589 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7.2, 0.8.0 Reporter: Jason Rosenberg Assignee: Swapnil Ghike Priority: Minor Labels: bugs, newbie Attachments: KAFKA-589-v1.patch Hi, I'm embedding the kafka server (0.7.2) in an application container. I've noticed that if I try to start the server without zookeeper being available, by default it gets a zk connection timeout after 6 seconds, and then throws an Exception out of KafkaServer.startup()E.g., I see this stack trace: Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000 at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84) at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44) at kafka.log.LogManager.init(LogManager.scala:93) at kafka.server.KafkaServer.startup(KafkaServer.scala:58) So that's ok, I can catch the exception, and then shut everything down gracefully, in this case. However, when I do this, it seems there is a daemon thread still around, which doesn't quit, and so the server never actually exits the jvm. Specifically, this thread seems to hang around: kafka-logcleaner-0 prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on condition [112c07000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 7f40d4be8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) at java.lang.Thread.run(Thread.java:680) Looking at the code in kafka.log.LogManager(), it does seem like it starts up the scheduler to clean logs, before then trying to connect to zk (and in this case fail): /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { info(starting log cleaner every + logCleanupIntervalMs + ms) scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) } So this scheduler does not appear to be stopped if startup fails. However, if I catch the above RuntimeException, and then call KafkaServer.shutdown(), then it will stop the scheduler, and all is good. However, it seems odd that if I get an exception when calling KafkaServer.startup(), that I should still have to do a KafkaServer.shutdown(). Rather, wouldn't it be better to have it internally cleanup after itself if startup() gets an exception? I'm not sure I can reliably call shutdown() after a failed startup() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148520#comment-14148520 ] Sriram Subramanian commented on KAFKA-1555: --- 2. I agree. I think what min_isr helps in is to have a way to specify I don't want to loose my data as long as min_isr - 1 number of nodes are down. For example, if no_of_replicas=3, min_isr = 2 and ack=-1, we should not loose data as long as one node is down even when there is an unclean leader election. In this particular case, when the leader fails, it is expected that all replica nodes are up but could be out of the isr. Under such constraints it is definitely possible to prevent data loss (ignoring data loss due to system failures and data not flushed to disk) by making the node with the longest log (assuming we ensure they don't diverge) as the leader. 3. I prefer b or c. d is attractive since you could use just one variable to define your required guarantees but it is hard to understand at the API level. 4. I totally agree. The issue is ISR takes a while to reflect the actual reality. Assume we failed early before writing to the local log and did not have any checks after writing. Replicas go down. It would take a while for the isr to reflect that the replicas are not in the isr anymore. During this time, we would simply write the messages to the log and loose it later. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/#review54635 --- Thanks for the patch. Looks good overall. A couple of comments. 1. Let's add a check of min.isr in Partition.appendMessagesToLeader() so that the message is not added to the leader's log if isr is less than min.isr. 2. We talked about dropping the ack 1 support in the jira. However, I think that's easier done after kafka-1583 is completed. So, we don't have to do this change here. I will file a follow up jira. - Jun Rao On Sept. 25, 2014, 7:41 p.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/ --- (Updated Sept. 25, 2014, 7:41 p.m.) Review request for kafka. Repository: kafka Description --- KAFKA-1555: provide strong consistency with reasonable availability Diffs - clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 core/src/main/scala/kafka/cluster/Partition.scala ff106b4 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 core/src/main/scala/kafka/common/NotEnoughReplicasException.scala PRE-CREATION core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 39f777b core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c Diff: https://reviews.apache.org/r/25886/diff/ Testing --- With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, with 1,3 and 4 min.insync.replicas. * min.insync.replicas=1 behaved normally (all writes succeeded as long as a broker was up) * min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and one broker was down * min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1 See notes about retry behavior in the JIRA. Thanks, Gwen Shapira
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148542#comment-14148542 ] Jun Rao commented on KAFKA-1555: 4b or 4d has the same issue that the value of min.isr or ack may not work with all topics that can be sent through the same producer instance. 4c has the issue that the broker can't reject messages if isr is less than min.isr. So, personally, I think 4a is still the best option. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1631) ReplicationFactor and under-replicated partitions incorrect during reassignment
[ https://issues.apache.org/jira/browse/KAFKA-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1631: - Attachment: KAFKA-1631-v1.patch This patch fixes things in a way that I think [~rberdeen] would find sensible. The issue was that the test for under-replication was comparing the current set of assigned replicas against the number of in sync replicas. But during the reassignment that isn't really correct because if you, e.g., move all replicas to different brokers then you'll have more than the target # of partitions. The fix is to look up the active set of reassignments and, if one is active for the partition, use that reassignment info to determine the correct # of replicas; otherwise, we can fall back on the active set. Note that this does mean that reassignments that increase the number of replicas will show up as under-replicated, which I think may be the case [~nehanarkhede] was hoping to fix. It's arguable which approach is correct (i.e. should the new target # of replicas apply as soon as the reassignment is issued or once it's completed). As for the replication factor being reported -- that is the number of currently assigned replicas for the first partition and has a number of issues. 1. It can be higher than the real target number of replicas as described above. 2. It's also not really correct to have it on the topic summary line since it varies by partition. 3. Finally, it's not even just the value for partition 0 because it's just using the head of a Map. If we're ok with changing the output formatting here, I can clean that part up as well, maybe by adding ReplicationFactor to each partition line and making it use the value used when determining under-replication. ReplicationFactor and under-replicated partitions incorrect during reassignment --- Key: KAFKA-1631 URL: https://issues.apache.org/jira/browse/KAFKA-1631 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1631-v1.patch We have a topic with a replication factor of 3. We monitor UnderReplicatedPartitions as recommended by the documentation. During a partition reassignment, partitions being reassigned are reported as under-replicated. Running a describe shows: {code} Topic:activity-wal-1PartitionCount:15 ReplicationFactor:5 Configs: Topic: activity-wal-1 Partition: 0Leader: 14 Replicas: 14,13,12,11,15Isr: 14,12,11,13 Topic: activity-wal-1 Partition: 1Leader: 14 Replicas: 15,14,11 Isr: 14,11 Topic: activity-wal-1 Partition: 2Leader: 11 Replicas: 11,15,12 Isr: 12,11,15 ... {code} It looks like the displayed replication factor, 5, is simply the number of replicas listed for the first partition, which includes both brokers in the current list and those onto which the partition is being reassigned. Partition 0 is also included in the list when using the `--under-replicated-partitions` option, even though it is replicated to more partitions than the true replication factor. During a reassignment, the under-replicated partitions metric is not usable, meaning that actual under-replicated partitions can go unnoticed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148557#comment-14148557 ] Neha Narkhede commented on KAFKA-1555: -- Followed the discussion and want to share some thoughts- 1. With the new setting, it is necessary to get rid of the current semantics of acks 1. 2. Given 1, initially I liked the approach of overriding acks 1 to be the semantics provided by what's described for min.isr in this JIRA. This is actually more intuitive compared to -2 .. -N. A downside of this approach, though, is that clients would need to know the replication factor of the topic which is known to the admin and also changeable only by the admin. So, probably moving it to be a topic level config that is also changeable by the admin makes sense. 3. Since we are adding a topic level config for the min.isr setting, it makes a lot of sense to express acks to be an enum. I think users would find it much more intuitive if the acks config was expressed as {no_ack, leader_ack, committed_ack} 4. It is important to reject the write on the server when the requested min.acks is greater than the size of the current ISR. It is true that the ISR size could change immediately after rejecting the write, but it would lead to far fewer duplicates. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1631) ReplicationFactor and under-replicated partitions incorrect during reassignment
[ https://issues.apache.org/jira/browse/KAFKA-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148559#comment-14148559 ] Ryan Berdeen commented on KAFKA-1631: - The patch does look like an improvement to the {{TopicCommand}}, but doesn't address the number of under-replicated partitions reported by the brokers. It seems like there shouldn't be multiple definitions of under-replicated partition. ReplicationFactor and under-replicated partitions incorrect during reassignment --- Key: KAFKA-1631 URL: https://issues.apache.org/jira/browse/KAFKA-1631 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1631-v1.patch We have a topic with a replication factor of 3. We monitor UnderReplicatedPartitions as recommended by the documentation. During a partition reassignment, partitions being reassigned are reported as under-replicated. Running a describe shows: {code} Topic:activity-wal-1PartitionCount:15 ReplicationFactor:5 Configs: Topic: activity-wal-1 Partition: 0Leader: 14 Replicas: 14,13,12,11,15Isr: 14,12,11,13 Topic: activity-wal-1 Partition: 1Leader: 14 Replicas: 15,14,11 Isr: 14,11 Topic: activity-wal-1 Partition: 2Leader: 11 Replicas: 11,15,12 Isr: 12,11,15 ... {code} It looks like the displayed replication factor, 5, is simply the number of replicas listed for the first partition, which includes both brokers in the current list and those onto which the partition is being reassigned. Partition 0 is also included in the list when using the `--under-replicated-partitions` option, even though it is replicated to more partitions than the true replication factor. During a reassignment, the under-replicated partitions metric is not usable, meaning that actual under-replicated partitions can go unnoticed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 25995: Patch for KAFKA-1650
On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 299 https://reviews.apache.org/r/25995/diff/1/?file=704523#file704523line299 Do we need to do this check every time in the loop? Maybe we can put this check out of while loop but that would probably introduce more duplicate code. Since the offset commit is not that frequent and the retry is hopefully rare, it should not create much overhead. On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 168 https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line168 Do you need to turn off auto commit on the consumer threads here? I thought offset commit should be turned off in the consumer config. Is that the case? On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 207 https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line207 For clean shutdown, you need to 1) halt consumer threads first. 2) wait for producer to drain all the messages in data channel. 3) manually commit offsets on consumer threads. 4) shut down consumer threads. Otherwise we will have data duplicates as we commit offsets based on min. Talked to Guozhang, changed the process to be as below: 1. shutdown consumer threads. 2. shutdown producer 3. commit offsets 4. shutdown consumer connectors On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 447 https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line447 Adding comment to the logic of how this works. Also a few questions: 1) is the map() call synchronized with other threads putting new offsets into the map? 2) after the sorting, the logic may be clearer as val commitableOffsetIndex = 0 while (offsets[commitableOffsetIndex] - offsets.head == commitableOffsetIndex) commitableOffsetIndex += 1 offsetToCommit = offsets[commitableOffsetIndex] + 1 We are using a concurrent map, that guarantees a single put/get operation is atomic. Although its possible that the offset we get for different partitions might reflects different time point's value. But it should not matter that much because later commits will get updated value. And the offset we commit when exiting is guaranteed to be after the producer is shutdown. So I think the commits during running time does not needs to be 100% accurate. On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 483 https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line483 The send().get() call is missing. I put it in side the put(). - Jiangjie --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review54620 --- On Sept. 24, 2014, 4:26 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Sept. 24, 2014, 4:26 p.m.) Review request for kafka. Bugs: KAFKA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 Repository: kafka Description --- mirror maker redesign; adding byte bounded blocking queue. Diffs - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/#review54639 --- I suggest we also make the changes required to reject the write if min.isr size of current isr. clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java https://reviews.apache.org/r/25886/#comment94876 Could you please add a description to this exception class that explains the purpose of the exception? core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/25886/#comment94878 can you remove the extra whitespace in requiredOffset )? core/src/main/scala/kafka/common/NotEnoughReplicasException.scala https://reviews.apache.org/r/25886/#comment94879 Same here. Could you please add a description? - Neha Narkhede On Sept. 25, 2014, 7:41 p.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/ --- (Updated Sept. 25, 2014, 7:41 p.m.) Review request for kafka. Repository: kafka Description --- KAFKA-1555: provide strong consistency with reasonable availability Diffs - clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 core/src/main/scala/kafka/cluster/Partition.scala ff106b4 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 core/src/main/scala/kafka/common/NotEnoughReplicasException.scala PRE-CREATION core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 39f777b core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c Diff: https://reviews.apache.org/r/25886/diff/ Testing --- With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, with 1,3 and 4 min.insync.replicas. * min.insync.replicas=1 behaved normally (all writes succeeded as long as a broker was up) * min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and one broker was down * min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1 See notes about retry behavior in the JIRA. Thanks, Gwen Shapira
[jira] [Commented] (KAFKA-1499) Broker-side compression configuration
[ https://issues.apache.org/jira/browse/KAFKA-1499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148598#comment-14148598 ] Joel Koshy commented on KAFKA-1499: --- Thanks - it works now. Sorry I had forgotten to run the unit tests. The DynamicConfigChangeTest fails - however, we probably should not simply add the broker.compression.enable property to the valid list since it is not really a valid topic-level config. We can follow-up tomorrow on this. Broker-side compression configuration - Key: KAFKA-1499 URL: https://issues.apache.org/jira/browse/KAFKA-1499 Project: Kafka Issue Type: New Feature Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.2 Attachments: KAFKA-1499.patch, KAFKA-1499.patch, KAFKA-1499_2014-08-15_14:20:27.patch, KAFKA-1499_2014-08-21_21:44:27.patch, KAFKA-1499_2014-09-21_15:57:23.patch, KAFKA-1499_2014-09-23_14:45:38.patch, KAFKA-1499_2014-09-24_14:20:33.patch, KAFKA-1499_2014-09-24_14:24:54.patch, KAFKA-1499_2014-09-25_11:05:57.patch Original Estimate: 72h Remaining Estimate: 72h A given topic can have messages in mixed compression codecs. i.e., it can also have a mix of uncompressed/compressed messages. It will be useful to support a broker-side configuration to recompress messages to a specific compression codec. i.e., all messages (for all topics) on the broker will be compressed to this codec. We could have per-topic overrides as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 25944: Patch for KAFKA-1013
On Sept. 25, 2014, 10:43 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 374 https://reviews.apache.org/r/25944/diff/3/?file=703083#file703083line374 Is there a difference between these two: Thread.sleep() TimeUnit.MILLISECONDS.sleep() since we do the former everywhere in the code base. Its the same thing. Just another way of doing it. Internally it does the same thing. On Sept. 25, 2014, 10:43 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/ExportOffsets.scala, line 33 https://reviews.apache.org/r/25944/diff/3/?file=703085#file703085line33 Can we actually make the same format for ZK / offsetmanager, making two different formats would make it harder to be parsed since the user needs to know whether ZK or offsetmanager is used. Yeah, the offset manager format is simpler. So unless we are planning to fetch offsets from ZK and commit to OffsetManager this should work. what do you think? On Sept. 25, 2014, 10:43 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/ExportOffsets.scala, line 199 https://reviews.apache.org/r/25944/diff/3/?file=703085#file703085line199 You can take a look at KAFKA-686's latest patch which did some cleanup on the util functions; these function may probably merged into Utils. Would like to discuss more on this! - Mayuresh --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25944/#review54609 --- On Sept. 23, 2014, 5:48 p.m., Mayuresh Gharat wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25944/ --- (Updated Sept. 23, 2014, 5:48 p.m.) Review request for kafka. Bugs: KAFKA-1013 https://issues.apache.org/jira/browse/KAFKA-1013 Repository: kafka Description --- OffsetCLient Tool API. ImportZkOffsets and ExportZkOffsets replaced by ImportOffsets and ExportOffsets Modified the comments in the headers Corrected a value Diffs - config/consumer.properties 83847de30d10b6e78bb8de28e0bb925d7c0e6ca2 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/tools/ConfigConstants.scala PRE-CREATION core/src/main/scala/kafka/tools/ExportOffsets.scala PRE-CREATION core/src/main/scala/kafka/tools/ImportOffsets.scala PRE-CREATION core/src/main/scala/kafka/tools/OffsetClient.scala PRE-CREATION Diff: https://reviews.apache.org/r/25944/diff/ Testing --- Thanks, Mayuresh Gharat
[jira] [Commented] (KAFKA-1631) ReplicationFactor and under-replicated partitions incorrect during reassignment
[ https://issues.apache.org/jira/browse/KAFKA-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148680#comment-14148680 ] Ewen Cheslack-Postava commented on KAFKA-1631: -- Right. Unfortunately most of the system isn't aware of the large scale change (reassign old set - new set), only of each intermediate state (old set - old set + new set - new set). As it stands, the UnderReplicatedPartitions are computed by Partition class, which is created by ReplicaManager. But the high-level reassignment is managed by KafkaController, and looks like the only place the necessary state is maintained. I think getting the semantics you want may require a much more substantial change since each partition leader will need to know about the partition reassignment rather than just the controller. On the other hand, while I think it's less than ideal, the current behavior could certainly be argued to be reasonable -- i.e. that reassignment is not natively supported, it's just a higher-level operation you can build up. In this case, the intermediate step is expected, and the temporary reporting of under-replication would make sense since for a time the desired replication of (old set + new set) has not been achieved. ReplicationFactor and under-replicated partitions incorrect during reassignment --- Key: KAFKA-1631 URL: https://issues.apache.org/jira/browse/KAFKA-1631 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie Attachments: KAFKA-1631-v1.patch We have a topic with a replication factor of 3. We monitor UnderReplicatedPartitions as recommended by the documentation. During a partition reassignment, partitions being reassigned are reported as under-replicated. Running a describe shows: {code} Topic:activity-wal-1PartitionCount:15 ReplicationFactor:5 Configs: Topic: activity-wal-1 Partition: 0Leader: 14 Replicas: 14,13,12,11,15Isr: 14,12,11,13 Topic: activity-wal-1 Partition: 1Leader: 14 Replicas: 15,14,11 Isr: 14,11 Topic: activity-wal-1 Partition: 2Leader: 11 Replicas: 11,15,12 Isr: 12,11,15 ... {code} It looks like the displayed replication factor, 5, is simply the number of replicas listed for the first partition, which includes both brokers in the current list and those onto which the partition is being reassigned. Partition 0 is also included in the list when using the `--under-replicated-partitions` option, even though it is replicated to more partitions than the true replication factor. During a reassignment, the under-replicated partitions metric is not usable, meaning that actual under-replicated partitions can go unnoticed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Reopened] (KAFKA-1577) Exception in ConnectionQuotas while shutting down
[ https://issues.apache.org/jira/browse/KAFKA-1577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede reopened KAFKA-1577: -- See KAFKA-1652 Exception in ConnectionQuotas while shutting down - Key: KAFKA-1577 URL: https://issues.apache.org/jira/browse/KAFKA-1577 Project: Kafka Issue Type: Bug Components: core Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Labels: newbie Attachments: KAFKA-1577.patch, KAFKA-1577_2014-08-20_19:57:44.patch, KAFKA-1577_2014-08-26_07:33:13.patch, KAFKA-1577_check_counter_before_decrementing.patch {code} [2014-08-07 19:38:08,228] ERROR Uncaught exception in thread 'kafka-network-thread-9092-0': (kafka.utils.Utils$) java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:185) at scala.None$.get(Option.scala:183) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:471) at kafka.network.AbstractServerThread.close(SocketServer.scala:158) at kafka.network.AbstractServerThread.close(SocketServer.scala:150) at kafka.network.AbstractServerThread.closeAll(SocketServer.scala:171) at kafka.network.Processor.run(SocketServer.scala:338) at java.lang.Thread.run(Thread.java:662) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1652) NoSuchElementException on server shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede resolved KAFKA-1652. -- Resolution: Duplicate NoSuchElementException on server shutdown - Key: KAFKA-1652 URL: https://issues.apache.org/jira/browse/KAFKA-1652 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Labels: newbie++ Observed the following exception on server shutdown. Happens pretty frequently though not on every shutdown. Ways to reproduce - 1. Checkout kafka from trunk 2. Run bin/kafka-server-start.sh config/server.properties 3. Ctrl C {noformat} 2014-09-25 12:31:14,441] ERROR None.get (kafka.network.Processor) java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:522) at kafka.network.AbstractServerThread.close(SocketServer.scala:165) at kafka.network.AbstractServerThread.close(SocketServer.scala:157) at kafka.network.Processor.close(SocketServer.scala:372) at kafka.network.AbstractServerThread.closeAll(SocketServer.scala:178) at kafka.network.Processor$$anonfun$run$3.apply$mcV$sp(SocketServer.scala:362) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.network.AbstractServerThread.swallowError(SocketServer.scala:108) at kafka.network.Processor.run(SocketServer.scala:362) at java.lang.Thread.run(Thread.java:695) {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1653) Duplicate broker ids allowed in replica assignment
[ https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1653: - Labels: newbie (was: ) Duplicate broker ids allowed in replica assignment -- Key: KAFKA-1653 URL: https://issues.apache.org/jira/browse/KAFKA-1653 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.8.1.1 Reporter: Ryan Berdeen Labels: newbie The reassign partitions command and the controller do not ensure that all replicas for a partition are on different brokers. For example, you could set 1,2,2 as the list of brokers for the replicas. kafka-topics.sh --describe --under-replicated will list these partitions as under-replicated, but I can't see a reason why the controller should allow this state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts
[ https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148702#comment-14148702 ] Neha Narkhede commented on KAFKA-1647: -- I see. So for this to lose all the data for the concerned partition, b1 has to be bounced to become a follower again and at that time b0 should be up for the become follower to succeed. Right? It is a pretty corner case, but should be fixed. Replication offset checkpoints (high water marks) can be lost on hard kills and restarts Key: KAFKA-1647 URL: https://issues.apache.org/jira/browse/KAFKA-1647 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Priority: Critical Labels: newbie++ We ran into this scenario recently in a production environment. This can happen when enough brokers in a cluster are taken down. i.e., a rolling bounce done properly should not cause this issue. It can occur if all replicas for any partition are taken down. Here is a sample scenario: * Cluster of three brokers: b0, b1, b2 * Two partitions (of some topic) with replication factor two: p0, p1 * Initial state: ** p0: leader = b0, ISR = {b0, b1} ** p1: leader = b1, ISR = {b0, b1} * Do a parallel hard-kill of all brokers * Bring up b2, so it is the new controller * b2 initializes its controller context and populates its leader/ISR cache (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last known leaders are b0 (for p0) and b1 (for p2) * Bring up b1 * The controller's onBrokerStartup procedure initiates a replica state change for all replicas on b1 to become online. As part of this replica state change it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not included in the leaders field because b0 is down. * On receiving the LeaderAndIsrRequest, b1's replica manager will successfully make b2 the leader for p1 (and create the local replica object corresponding to p1). It will however abort the become follower transition for p0 because the designated leader b2 is offline. So it will not create the local replica object for p0. * It will then start the high water mark checkpoint thread. Since only p1 has a local replica object, only p1's high water mark will be checkpointed to disk. p0's previously written checkpoint if any will be lost. So in summary it seems we should always create the local replica object even if the online transition does not happen. Possible symptoms of the above bug could be one or more of the following (we saw 2 and 3): # Data loss; yes on a hard-kill data loss is expected, but this can actually cause loss of nearly all data if the broker becomes follower, truncates, and soon after happens to become leader. # High IO on brokers that lose their high water mark then subsequently (on a successful become follower transition) truncate their log to zero and start catching up from the beginning. # If the offsets topic is affected, then offsets can get reset. This is because during an offset load we don't read past the high water mark. So if a water mark is missing then we don't load anything (even if the offsets are there in the log). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Build failed in Jenkins: Kafka-trunk #277
See https://builds.apache.org/job/Kafka-trunk/277/changes Changes: [neha.narkhede] KAFKA-404 When using chroot path, create chroot on startup if it doesn't exist; reviewed by Neha Narkhede -- Started by an SCM change Building remotely on ubuntu3 (Ubuntu ubuntu) in workspace https://builds.apache.org/job/Kafka-trunk/ws/ git rev-parse --is-inside-work-tree Fetching changes from the remote Git repository git config remote.origin.url https://git-wip-us.apache.org/repos/asf/kafka.git Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git git --version git fetch --tags --progress https://git-wip-us.apache.org/repos/asf/kafka.git +refs/heads/*:refs/remotes/origin/* git rev-parse origin/trunk^{commit} Checking out Revision f750dba65f9d9552a61a0754c46fa6e294785b31 (origin/trunk) git config core.sparsecheckout git checkout -f f750dba65f9d9552a61a0754c46fa6e294785b31 git rev-list 084566b837ee2204b6898b82156e811d0601085f [Kafka-trunk] $ /bin/bash -xe /tmp/hudson8706236251140632788.sh + gradle /tmp/hudson8706236251140632788.sh: line 2: gradle: command not found Build step 'Execute shell' marked build as failure
[jira] [Commented] (KAFKA-1499) Broker-side compression configuration
[ https://issues.apache.org/jira/browse/KAFKA-1499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148718#comment-14148718 ] Manikumar Reddy commented on KAFKA-1499: Yes, broker.compression.enable property does not belongs to LogConfig. This config is available in kafkaConfig. KafkaConfig is not used in Log.scala. Can I modify the constructor of LogManager, Log.scala classes to pass broker.compression.enable property. Broker-side compression configuration - Key: KAFKA-1499 URL: https://issues.apache.org/jira/browse/KAFKA-1499 Project: Kafka Issue Type: New Feature Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.2 Attachments: KAFKA-1499.patch, KAFKA-1499.patch, KAFKA-1499_2014-08-15_14:20:27.patch, KAFKA-1499_2014-08-21_21:44:27.patch, KAFKA-1499_2014-09-21_15:57:23.patch, KAFKA-1499_2014-09-23_14:45:38.patch, KAFKA-1499_2014-09-24_14:20:33.patch, KAFKA-1499_2014-09-24_14:24:54.patch, KAFKA-1499_2014-09-25_11:05:57.patch Original Estimate: 72h Remaining Estimate: 72h A given topic can have messages in mixed compression codecs. i.e., it can also have a mix of uncompressed/compressed messages. It will be useful to support a broker-side configuration to recompress messages to a specific compression codec. i.e., all messages (for all topics) on the broker will be compressed to this codec. We could have per-topic overrides as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-589) Clean shutdown after startup connection failure
[ https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-589: Resolution: Fixed Status: Resolved (was: Patch Available) Pushed to trunk Clean shutdown after startup connection failure --- Key: KAFKA-589 URL: https://issues.apache.org/jira/browse/KAFKA-589 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7.2, 0.8.0 Reporter: Jason Rosenberg Assignee: Swapnil Ghike Priority: Minor Labels: bugs, newbie Attachments: KAFKA-589-v1.patch Hi, I'm embedding the kafka server (0.7.2) in an application container. I've noticed that if I try to start the server without zookeeper being available, by default it gets a zk connection timeout after 6 seconds, and then throws an Exception out of KafkaServer.startup()E.g., I see this stack trace: Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000 at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84) at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44) at kafka.log.LogManager.init(LogManager.scala:93) at kafka.server.KafkaServer.startup(KafkaServer.scala:58) So that's ok, I can catch the exception, and then shut everything down gracefully, in this case. However, when I do this, it seems there is a daemon thread still around, which doesn't quit, and so the server never actually exits the jvm. Specifically, this thread seems to hang around: kafka-logcleaner-0 prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on condition [112c07000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 7f40d4be8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) at java.lang.Thread.run(Thread.java:680) Looking at the code in kafka.log.LogManager(), it does seem like it starts up the scheduler to clean logs, before then trying to connect to zk (and in this case fail): /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { info(starting log cleaner every + logCleanupIntervalMs + ms) scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) } So this scheduler does not appear to be stopped if startup fails. However, if I catch the above RuntimeException, and then call KafkaServer.shutdown(), then it will stop the scheduler, and all is good. However, it seems odd that if I get an exception when calling KafkaServer.startup(), that I should still have to do a KafkaServer.shutdown(). Rather, wouldn't it be better to have it internally cleanup after itself if startup() gets an exception? I'm not sure I can reliably call shutdown() after a failed startup() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-589) Clean shutdown after startup connection failure
[ https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-589: Assignee: Ewen Cheslack-Postava (was: Swapnil Ghike) Clean shutdown after startup connection failure --- Key: KAFKA-589 URL: https://issues.apache.org/jira/browse/KAFKA-589 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7.2, 0.8.0 Reporter: Jason Rosenberg Assignee: Ewen Cheslack-Postava Priority: Minor Labels: bugs, newbie Attachments: KAFKA-589-v1.patch Hi, I'm embedding the kafka server (0.7.2) in an application container. I've noticed that if I try to start the server without zookeeper being available, by default it gets a zk connection timeout after 6 seconds, and then throws an Exception out of KafkaServer.startup()E.g., I see this stack trace: Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000 at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84) at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44) at kafka.log.LogManager.init(LogManager.scala:93) at kafka.server.KafkaServer.startup(KafkaServer.scala:58) So that's ok, I can catch the exception, and then shut everything down gracefully, in this case. However, when I do this, it seems there is a daemon thread still around, which doesn't quit, and so the server never actually exits the jvm. Specifically, this thread seems to hang around: kafka-logcleaner-0 prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on condition [112c07000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 7f40d4be8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) at java.lang.Thread.run(Thread.java:680) Looking at the code in kafka.log.LogManager(), it does seem like it starts up the scheduler to clean logs, before then trying to connect to zk (and in this case fail): /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { info(starting log cleaner every + logCleanupIntervalMs + ms) scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) } So this scheduler does not appear to be stopped if startup fails. However, if I catch the above RuntimeException, and then call KafkaServer.shutdown(), then it will stop the scheduler, and all is good. However, it seems odd that if I get an exception when calling KafkaServer.startup(), that I should still have to do a KafkaServer.shutdown(). Rather, wouldn't it be better to have it internally cleanup after itself if startup() gets an exception? I'm not sure I can reliably call shutdown() after a failed startup() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-589) Clean shutdown after startup connection failure
[ https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148726#comment-14148726 ] Neha Narkhede commented on KAFKA-589: - Thanks for fixing a longstanding bug! +1 on the patch. Clean shutdown after startup connection failure --- Key: KAFKA-589 URL: https://issues.apache.org/jira/browse/KAFKA-589 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7.2, 0.8.0 Reporter: Jason Rosenberg Assignee: Swapnil Ghike Priority: Minor Labels: bugs, newbie Attachments: KAFKA-589-v1.patch Hi, I'm embedding the kafka server (0.7.2) in an application container. I've noticed that if I try to start the server without zookeeper being available, by default it gets a zk connection timeout after 6 seconds, and then throws an Exception out of KafkaServer.startup()E.g., I see this stack trace: Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000 at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98) at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84) at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44) at kafka.log.LogManager.init(LogManager.scala:93) at kafka.server.KafkaServer.startup(KafkaServer.scala:58) So that's ok, I can catch the exception, and then shut everything down gracefully, in this case. However, when I do this, it seems there is a daemon thread still around, which doesn't quit, and so the server never actually exits the jvm. Specifically, this thread seems to hang around: kafka-logcleaner-0 prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on condition [112c07000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 7f40d4be8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.DelayQueue.take(DelayQueue.java:164) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) at java.lang.Thread.run(Thread.java:680) Looking at the code in kafka.log.LogManager(), it does seem like it starts up the scheduler to clean logs, before then trying to connect to zk (and in this case fail): /* Schedule the cleanup task to delete old logs */ if(scheduler != null) { info(starting log cleaner every + logCleanupIntervalMs + ms) scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) } So this scheduler does not appear to be stopped if startup fails. However, if I catch the above RuntimeException, and then call KafkaServer.shutdown(), then it will stop the scheduler, and all is good. However, it seems odd that if I get an exception when calling KafkaServer.startup(), that I should still have to do a KafkaServer.shutdown(). Rather, wouldn't it be better to have it internally cleanup after itself if startup() gets an exception? I'm not sure I can reliably call shutdown() after a failed startup() -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1586) support sticky partitioning in the new producer
[ https://issues.apache.org/jira/browse/KAFKA-1586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148735#comment-14148735 ] Neha Narkhede commented on KAFKA-1586: -- +1 on not supporting sticky partitioning strategy in the new producer. It already offers flexibility to the user to use any partitioning strategy. If there are no objections, I'm leaning towards closing this JIRA. support sticky partitioning in the new producer --- Key: KAFKA-1586 URL: https://issues.apache.org/jira/browse/KAFKA-1586 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Attachments: KAFKA-1586.patch If a message doesn't specify a key or a partition, the new producer selects a partition for each message in a round-robin way. As a result, in a window of linger.ms, messages are spread around in all partitions of a topic. Compared with another strategy that assigns all messages to a single partition in the same time window, this strategy may not compress the message set as well since the batch is smaller. Another potential problem with this strategy is that the compression ratio could be sensitive to the change of # partitions in a topic. If # partitions are increased in a topic, the produced data may not be compressed as well as before. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1591) Clean-up Unnecessary stack trace in error/warn logs
[ https://issues.apache.org/jira/browse/KAFKA-1591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1591: - Reviewer: Jun Rao [~junrao], please feel free to reassign :) Going through JIRAs that have patches available but no reviewers. Clean-up Unnecessary stack trace in error/warn logs --- Key: KAFKA-1591 URL: https://issues.apache.org/jira/browse/KAFKA-1591 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Abhishek Sharma Labels: newbie Fix For: 0.9.0 Attachments: Jira-1591- As per Guozhang latest comment.patch, Jira-1591- Complete Changes Patch.patch, Jira-1591-SocketConnection-Warning.patch, Jira1591-SendProducerRequest-Warning.patch Some of the unnecessary stack traces in error / warning log entries can easily pollute the log files. Examples include KAFKA-1066, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability
On Sept. 26, 2014, 12:17 a.m., Jun Rao wrote: Thanks for the patch. Looks good overall. A couple of comments. 1. Let's add a check of min.isr in Partition.appendMessagesToLeader() so that the message is not added to the leader's log if isr is less than min.isr. 2. We talked about dropping the ack 1 support in the jira. However, I think that's easier done after kafka-1583 is completed. So, we don't have to do this change here. I will file a follow up jira. 1. We can give a different exception in this case, sth like RejectMessageDueToNotEnoughReplicas. - Jun --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/#review54635 --- On Sept. 25, 2014, 7:41 p.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25886/ --- (Updated Sept. 25, 2014, 7:41 p.m.) Review request for kafka. Repository: kafka Description --- KAFKA-1555: provide strong consistency with reasonable availability Diffs - clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 core/src/main/scala/kafka/cluster/Partition.scala ff106b4 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 core/src/main/scala/kafka/common/NotEnoughReplicasException.scala PRE-CREATION core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 39f777b core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c Diff: https://reviews.apache.org/r/25886/diff/ Testing --- With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, with 1,3 and 4 min.insync.replicas. * min.insync.replicas=1 behaved normally (all writes succeeded as long as a broker was up) * min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and one broker was down * min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1 See notes about retry behavior in the JIRA. Thanks, Gwen Shapira
[jira] [Updated] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker
[ https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BalajiSeshadri updated KAFKA-1618: -- Attachment: KAFKA-1618-CLEAN.patch Exception thrown when running console producer with no port number for the broker - Key: KAFKA-1618 URL: https://issues.apache.org/jira/browse/KAFKA-1618 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Gwen Shapira Assignee: BalajiSeshadri Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, KAFKA-1618-CLEAN.patch, KAFKA-1618-REBASED.patch, KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch When running console producer with just localhost as the broker list, I get ArrayIndexOutOfBounds exception. I expect either a clearer error about arguments or for the producer to guess a default port. [root@shapira-1 bin]# ./kafka-console-producer.sh --topic rufus1 --broker-list localhost java.lang.ArrayIndexOutOfBoundsException: 1 at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102) at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97) at kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32) at kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41) at kafka.producer.Producer.init(Producer.scala:59) at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158) at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker
[ https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148755#comment-14148755 ] BalajiSeshadri commented on KAFKA-1618: --- Ok,this one applies cleanly without issues. Here is the log when i applied. MER2AL-E0043615:kafka-trunk-1 balaji.seshadri$ git apply KAFKA-1618.patch -v Checking patch core/src/main/scala/kafka/tools/ConsoleProducer.scala... Checking patch core/src/main/scala/kafka/tools/GetOffsetShell.scala... Checking patch core/src/main/scala/kafka/tools/ProducerPerformance.scala... Checking patch core/src/main/scala/kafka/tools/ReplayLogProducer.scala... Checking patch core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala... Checking patch core/src/main/scala/kafka/tools/SimpleConsumerShell.scala... Checking patch core/src/main/scala/kafka/utils/ToolsUtils.scala... Applied patch core/src/main/scala/kafka/tools/ConsoleProducer.scala cleanly. Applied patch core/src/main/scala/kafka/tools/GetOffsetShell.scala cleanly. Applied patch core/src/main/scala/kafka/tools/ProducerPerformance.scala cleanly. Applied patch core/src/main/scala/kafka/tools/ReplayLogProducer.scala cleanly. Applied patch core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala cleanly. Applied patch core/src/main/scala/kafka/tools/SimpleConsumerShell.scala cleanly. Applied patch core/src/main/scala/kafka/utils/ToolsUtils.scala cleanly. [~nehanarkhede] Please verify. Exception thrown when running console producer with no port number for the broker - Key: KAFKA-1618 URL: https://issues.apache.org/jira/browse/KAFKA-1618 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Gwen Shapira Assignee: BalajiSeshadri Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, KAFKA-1618-CLEAN.patch, KAFKA-1618-REBASED.patch, KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch When running console producer with just localhost as the broker list, I get ArrayIndexOutOfBounds exception. I expect either a clearer error about arguments or for the producer to guess a default port. [root@shapira-1 bin]# ./kafka-console-producer.sh --topic rufus1 --broker-list localhost java.lang.ArrayIndexOutOfBoundsException: 1 at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102) at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97) at kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32) at kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41) at kafka.producer.Producer.init(Producer.scala:59) at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158) at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker
[ https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BalajiSeshadri updated KAFKA-1618: -- Status: In Progress (was: Patch Available) Exception thrown when running console producer with no port number for the broker - Key: KAFKA-1618 URL: https://issues.apache.org/jira/browse/KAFKA-1618 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Gwen Shapira Assignee: BalajiSeshadri Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, KAFKA-1618-CLEAN.patch, KAFKA-1618-REBASED.patch, KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch When running console producer with just localhost as the broker list, I get ArrayIndexOutOfBounds exception. I expect either a clearer error about arguments or for the producer to guess a default port. [root@shapira-1 bin]# ./kafka-console-producer.sh --topic rufus1 --broker-list localhost java.lang.ArrayIndexOutOfBoundsException: 1 at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102) at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97) at kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32) at kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41) at kafka.producer.Producer.init(Producer.scala:59) at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158) at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1652) NoSuchElementException on server shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148770#comment-14148770 ] Sriharsha Chintalapani commented on KAFKA-1652: --- [~jjkoshy] [~nehanarkhede] I am able to reproduce this will follow-up with a patch. NoSuchElementException on server shutdown - Key: KAFKA-1652 URL: https://issues.apache.org/jira/browse/KAFKA-1652 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Labels: newbie++ Observed the following exception on server shutdown. Happens pretty frequently though not on every shutdown. Ways to reproduce - 1. Checkout kafka from trunk 2. Run bin/kafka-server-start.sh config/server.properties 3. Ctrl C {noformat} 2014-09-25 12:31:14,441] ERROR None.get (kafka.network.Processor) java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:522) at kafka.network.AbstractServerThread.close(SocketServer.scala:165) at kafka.network.AbstractServerThread.close(SocketServer.scala:157) at kafka.network.Processor.close(SocketServer.scala:372) at kafka.network.AbstractServerThread.closeAll(SocketServer.scala:178) at kafka.network.Processor$$anonfun$run$3.apply$mcV$sp(SocketServer.scala:362) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.network.AbstractServerThread.swallowError(SocketServer.scala:108) at kafka.network.Processor.run(SocketServer.scala:362) at java.lang.Thread.run(Thread.java:695) {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker
[ https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] BalajiSeshadri updated KAFKA-1618: -- Status: Patch Available (was: In Progress) Exception thrown when running console producer with no port number for the broker - Key: KAFKA-1618 URL: https://issues.apache.org/jira/browse/KAFKA-1618 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Gwen Shapira Assignee: BalajiSeshadri Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, KAFKA-1618-CLEAN.patch, KAFKA-1618-REBASED.patch, KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch When running console producer with just localhost as the broker list, I get ArrayIndexOutOfBounds exception. I expect either a clearer error about arguments or for the producer to guess a default port. [root@shapira-1 bin]# ./kafka-console-producer.sh --topic rufus1 --broker-list localhost java.lang.ArrayIndexOutOfBoundsException: 1 at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102) at kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97) at kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32) at kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41) at kafka.producer.Producer.init(Producer.scala:59) at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158) at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1586) support sticky partitioning in the new producer
[ https://issues.apache.org/jira/browse/KAFKA-1586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148771#comment-14148771 ] Jim Hoagland commented on KAFKA-1586: - +1 agreeing with Jay and Neha. I found the sticky behavior confusing and made me wonder what the heck was happening (I thought I was doing something wrong). support sticky partitioning in the new producer --- Key: KAFKA-1586 URL: https://issues.apache.org/jira/browse/KAFKA-1586 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Attachments: KAFKA-1586.patch If a message doesn't specify a key or a partition, the new producer selects a partition for each message in a round-robin way. As a result, in a window of linger.ms, messages are spread around in all partitions of a topic. Compared with another strategy that assigns all messages to a single partition in the same time window, this strategy may not compress the message set as well since the batch is smaller. Another potential problem with this strategy is that the compression ratio could be sensitive to the change of # partitions in a topic. If # partitions are increased in a topic, the produced data may not be compressed as well as before. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Build failed in Jenkins: Kafka-trunk #278
See https://builds.apache.org/job/Kafka-trunk/278/changes Changes: [neha.narkhede] KAFKA-589 Clean shutdown after startup connection failure; reviewed by Neha Narkhede -- Started by an SCM change Building remotely on ubuntu-1 (Ubuntu ubuntu) in workspace https://builds.apache.org/job/Kafka-trunk/ws/ git rev-parse --is-inside-work-tree Fetching changes from the remote Git repository git config remote.origin.url https://git-wip-us.apache.org/repos/asf/kafka.git Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git git --version git fetch --tags --progress https://git-wip-us.apache.org/repos/asf/kafka.git +refs/heads/*:refs/remotes/origin/* git rev-parse origin/trunk^{commit} Checking out Revision 9c17747baab829adb268da28b4d943bbd6ef4e9f (origin/trunk) git config core.sparsecheckout git checkout -f 9c17747baab829adb268da28b4d943bbd6ef4e9f git rev-list f750dba65f9d9552a61a0754c46fa6e294785b31 [Kafka-trunk] $ /bin/bash -xe /tmp/hudson2924963140271174866.sh + gradle /tmp/hudson2924963140271174866.sh: line 2: gradle: command not found Build step 'Execute shell' marked build as failure
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148795#comment-14148795 ] Joel Koshy commented on KAFKA-1555: --- This is an implementation detail but wrt the duplicates scenario. i.e., 4(a) - on a not-enough-replicas error, the producer implementation could just stop retrying the producer request and pounding the broker(s) until the ISR returns to the min.isr level. It can instead switch to refreshing the topic metadata with backoff and checking the current ISR cardinality which is included in the metadata response (although it is currently inaccurate and needs to be fixed - KAFKA-1367). As soon as it returns to min.isr it can retry the request. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Gwen Shapira Fix For: 0.8.2 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts
[ https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148802#comment-14148802 ] Joel Koshy commented on KAFKA-1647: --- Yes that is one possibility but not the only one. For example, suppose this is a topic with replication factor three. This is typically when bringing up a cluster that was previously hard-killed. Suppose b1 and b2 are brought up simultaneously and lose their HW as described above. Suppose the controller then elects b1 as the leader. b2 then becomes the follower (successfully) but as part of that transition will truncate to zero. I think there should be more scenarios since I also saw this with a topic with replication factor of two but have not checked the logs yet to see if it was due to a subsequent bounce or something else. Replication offset checkpoints (high water marks) can be lost on hard kills and restarts Key: KAFKA-1647 URL: https://issues.apache.org/jira/browse/KAFKA-1647 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Priority: Critical Labels: newbie++ We ran into this scenario recently in a production environment. This can happen when enough brokers in a cluster are taken down. i.e., a rolling bounce done properly should not cause this issue. It can occur if all replicas for any partition are taken down. Here is a sample scenario: * Cluster of three brokers: b0, b1, b2 * Two partitions (of some topic) with replication factor two: p0, p1 * Initial state: ** p0: leader = b0, ISR = {b0, b1} ** p1: leader = b1, ISR = {b0, b1} * Do a parallel hard-kill of all brokers * Bring up b2, so it is the new controller * b2 initializes its controller context and populates its leader/ISR cache (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last known leaders are b0 (for p0) and b1 (for p2) * Bring up b1 * The controller's onBrokerStartup procedure initiates a replica state change for all replicas on b1 to become online. As part of this replica state change it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not included in the leaders field because b0 is down. * On receiving the LeaderAndIsrRequest, b1's replica manager will successfully make b2 the leader for p1 (and create the local replica object corresponding to p1). It will however abort the become follower transition for p0 because the designated leader b2 is offline. So it will not create the local replica object for p0. * It will then start the high water mark checkpoint thread. Since only p1 has a local replica object, only p1's high water mark will be checkpointed to disk. p0's previously written checkpoint if any will be lost. So in summary it seems we should always create the local replica object even if the online transition does not happen. Possible symptoms of the above bug could be one or more of the following (we saw 2 and 3): # Data loss; yes on a hard-kill data loss is expected, but this can actually cause loss of nearly all data if the broker becomes follower, truncates, and soon after happens to become leader. # High IO on brokers that lose their high water mark then subsequently (on a successful become follower transition) truncate their log to zero and start catching up from the beginning. # If the offsets topic is affected, then offsets can get reset. This is because during an offset load we don't read past the high water mark. So if a water mark is missing then we don't load anything (even if the offsets are there in the log). -- This message was sent by Atlassian JIRA (v6.3.4#6332)