[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Sriram Subramanian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14147474#comment-14147474
 ] 

Sriram Subramanian commented on KAFKA-1555:
---

1. Great. Not supporting all values above ack  1 is a good step. We are 
essentially not using it as an integer any more. I would still  love it to be 
made more explicit with an enum for clarity.

2. Also, by setting ack = -1 and min_isr = 2, we still do not avoid data loss 
when one broker goes down. The issue is the way we select a leader. When a 
request was written to the leader, the min_isr check could have succeeded and 
we would have written to min_isr - 1 number of replicas. However, the replicas 
could subsequently go out of the isr. When the leader fails after that, we 
would have an unclean leader election and select any replica as the leader and 
it could be one that was lagging. 

To completely guarantee no data loss, we would need to do the following
a. Ensure logs do not diverge on unclean leader elections
b. Choose the broker with the longest log as the leader

3. We may have not documented ack  1 but since it is an integer, there are 
chances somebody could be using it. In such a case this could be a backwards 
incompatible change. It would be worth mentioning it in the release notes.

4. Long term, I think the min_isr config should be in the API. This gives 
better control per message and explicitly lets the caller know what guarantees 
they get.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-328) Write unit test for kafka server startup and shutdown API

2014-09-25 Thread BalajiSeshadri (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14147815#comment-14147815
 ] 

BalajiSeshadri commented on KAFKA-328:
--

[~nehanarkhede] Can you please send me code style you are using ?.

Are you are using IntelliJ or Eclipse,please give me the formmater xml,i will 
import it to my environment.

 Write unit test for kafka server startup and shutdown API 
 --

 Key: KAFKA-328
 URL: https://issues.apache.org/jira/browse/KAFKA-328
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: BalajiSeshadri
  Labels: newbie
 Attachments: KAFKA-328.patch


 Background discussion in KAFKA-320
 People often try to embed KafkaServer in an application that ends up calling 
 startup() and shutdown() repeatedly and sometimes in odd ways. To ensure this 
 works correctly we have to be very careful about cleaning up resources. This 
 is a good practice for making unit tests reliable anyway.
 A good first step would be to add some unit tests on startup and shutdown to 
 cover various cases:
 1. A Kafka server can startup if it is not already starting up, if it is not 
 currently being shutdown, or if it hasn't been already started
 2. A Kafka server can shutdown if it is not already shutting down, if it is 
 not currently starting up, or if it hasn't been already shutdown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker

2014-09-25 Thread BalajiSeshadri (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BalajiSeshadri updated KAFKA-1618:
--
Attachment: KAFKA-1618-REBASED.patch

REBASED Patch

 Exception thrown when running console producer with no port number for the 
 broker
 -

 Key: KAFKA-1618
 URL: https://issues.apache.org/jira/browse/KAFKA-1618
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1.1
Reporter: Gwen Shapira
Assignee: BalajiSeshadri
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, 
 KAFKA-1618-REBASED.patch, KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, 
 KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch


 When running console producer with just localhost as the broker list, I get 
 ArrayIndexOutOfBounds exception.
 I expect either a clearer error about arguments or for the producer to 
 guess a default port.
 [root@shapira-1 bin]# ./kafka-console-producer.sh  --topic rufus1 
 --broker-list localhost
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102)
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97)
   at 
 kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32)
   at 
 kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41)
   at kafka.producer.Producer.init(Producer.scala:59)
   at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158)
   at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker

2014-09-25 Thread BalajiSeshadri (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14147871#comment-14147871
 ] 

BalajiSeshadri edited comment on KAFKA-1618 at 9/25/14 3:39 PM:


[~nehanarkhede] Please find REBASED Patch attached.


was (Author: balaji.sesha...@dish.com):
REBASED Patch

 Exception thrown when running console producer with no port number for the 
 broker
 -

 Key: KAFKA-1618
 URL: https://issues.apache.org/jira/browse/KAFKA-1618
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1.1
Reporter: Gwen Shapira
Assignee: BalajiSeshadri
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, 
 KAFKA-1618-REBASED.patch, KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, 
 KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch


 When running console producer with just localhost as the broker list, I get 
 ArrayIndexOutOfBounds exception.
 I expect either a clearer error about arguments or for the producer to 
 guess a default port.
 [root@shapira-1 bin]# ./kafka-console-producer.sh  --topic rufus1 
 --broker-list localhost
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102)
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97)
   at 
 kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32)
   at 
 kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41)
   at kafka.producer.Producer.init(Producer.scala:59)
   at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158)
   at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread saurabh agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14147900#comment-14147900
 ] 

saurabh agarwal commented on KAFKA-1555:



Along with min.isr setting, the unclean leader election is required to set to 
false in order to avoid the data loss. 

Let's take a scenario - number of replica is 3, and one replica is falling 
behind. This replica will be moved out of isr. Due to unclean leader election 
is turned off, it will not become a leader. Now in isr, there will only be 2 
replicas left. If the leader goes down, then the other replica in isr will 
become leader.  Due to min.isr=2, the write will not be successful until the 
another replica will join the ISR. 

I don't think there will be any data loss in this scenario.  


 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Read a specific number of messages using kafka

2014-09-25 Thread pankaj ojha
Hi,

My requirement is to read a specific number of messages from kafka topic
which contains data in json format and after reading number of messges, i
need to write that in a file and then stop. How can I count number of
messages read by my consumer code(either simpleconsumer or high level) ?

Please help.

-- 
Thanks,
Pankaj Ojha


Re: Read a specific number of messages using kafka

2014-09-25 Thread Gwen Shapira
Using high level consumer and assuming you already created an iterator:

while (msgCount  maxMessages  it.hasNext()) {
 bytes = it.next().message();
 eventList.add(bytes);
}

(See a complete example here:
https://github.com/apache/flume/blob/trunk/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java)

Gwen

On Thu, Sep 25, 2014 at 9:15 AM, pankaj ojha pankajojh...@gmail.com wrote:
 Hi,

 My requirement is to read a specific number of messages from kafka topic
 which contains data in json format and after reading number of messges, i
 need to write that in a file and then stop. How can I count number of
 messages read by my consumer code(either simpleconsumer or high level) ?

 Please help.

 --
 Thanks,
 Pankaj Ojha


Re: Read a specific number of messages using kafka

2014-09-25 Thread pankaj ojha
Thank You. I will try this out.

On Thu, Sep 25, 2014 at 10:01 PM, Gwen Shapira gshap...@cloudera.com
wrote:

 Using high level consumer and assuming you already created an iterator:

 while (msgCount  maxMessages  it.hasNext()) {
  bytes = it.next().message();
  eventList.add(bytes);
 }

 (See a complete example here:

 https://github.com/apache/flume/blob/trunk/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
 )

 Gwen

 On Thu, Sep 25, 2014 at 9:15 AM, pankaj ojha pankajojh...@gmail.com
 wrote:
  Hi,
 
  My requirement is to read a specific number of messages from kafka topic
  which contains data in json format and after reading number of messges, i
  need to write that in a file and then stop. How can I count number of
  messages read by my consumer code(either simpleconsumer or high level) ?
 
  Please help.
 
  --
  Thanks,
  Pankaj Ojha




-- 
Thanks,
Pankaj Ojha


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14147943#comment-14147943
 ] 

Joel Koshy commented on KAFKA-1555:
---

I'm +1 on the concept of min.isr and acks -1, 0, 1.

This is a very interesting and important thread - sorry I missed most of it 
until I spent a couple of hours (!) yesterday chewing on these comments. 
Sriram, with regard to your second point - I had a similar concern and I think 
we talked about it, but not sure if it is the same issue though. i.e., at the 
point the leader responds to the producer it knows how many followers have 
received the messages so if only min.isr - 1 replicas have been written to then 
the leader would return a NotEnoughReplicas error code. I agree that subsequent 
data loss is possible on unclean leader elections. However, that is sort of 
expected. I think Joe provided a good interpretation of min.isr - i.e., it 
provides a balance between your tolerance for the probability of data loss for 
stored data and the need of availability of brokers to write to. For lower 
probability of loss - i.e., lower probability of unclean leader elections one 
would use a higher min.isr. Avoiding (or rather reducing) data loss on unclean 
leader elections I think is an orthogonal issue that other jiras such as 
KAFKA-1211 touch upon.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Sriram Subramanian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14147967#comment-14147967
 ] 

Sriram Subramanian commented on KAFKA-1555:
---

ack = -1 with clean leader election already prevents data loss. If min_isr=2, I 
would expect the data to be never lost when the leader fails. That should be 
the simplest guarantee the system should provide. We should not add further 
clauses to this or it would be impossible to define the system. If we were to 
say - with min_isr-2 and ack=-1 you just reduced the probability of loss but 
it could still get lost under unclean leader election, we will loose 
credibility on these settings. 

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: KAFKA-404 and KAFKA-1651

2014-09-25 Thread Neha Narkhede
We only appreciate new contributions and don't mind going through a
contributor on boarding process. In fact, I'd encourage you to give
feedback on how we could improve your experience as a contributor :-)

We are trying to improve the turnaround time on patch reviews. Hopefully
assigning a Reviewer on the JIRA is helping improve that.

Thanks,
Neha

On Wed, Sep 24, 2014 at 10:53 PM, Jonathan Creasy jonathan.cre...@turn.com
wrote:

 Sorry for all the extra ReviewBoard and Jira email this evening. I’ve just
 recently gotten back to a place where I’ll be using and supporting Kafka in
 production and haven’t used these tools in a while, just getting
 reacquainted.

 I hope no one minds, thanks for allowing me to contribute!

 -Jonathan



[jira] [Commented] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker

2014-09-25 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148024#comment-14148024
 ] 

Neha Narkhede commented on KAFKA-1618:
--

[~balaji.sesha...@dish.com] I ran into issues applying the patch again. Would 
you mind following the Contributor workflow in our wiki - 
https://cwiki.apache.org/confluence/display/KAFKA/Patch+submission+and+review#Patchsubmissionandreview-Simplecontributorworkflow

{noformat}
nnarkhed-mn1:kafka-git-idea nnarkhed$ git apply --check 
KAFKA-1618-REBASED.patch 
error: src/main/scala/kafka/tools/ConsoleProducer.scala: No such file or 
directory
error: src/main/scala/kafka/tools/GetOffsetShell.scala: No such file or 
directory
error: src/main/scala/kafka/tools/ProducerPerformance.scala: No such file or 
directory
error: src/main/scala/kafka/tools/ReplayLogProducer.scala: No such file or 
directory
error: src/main/scala/kafka/tools/ReplicaVerificationTool.scala: No such file 
or directory
error: src/main/scala/kafka/tools/SimpleConsumerShell.scala: No such file or 
directory
error: src/main/scala/kafka/tools/ToolsUtils.scala: No such file or directory
{noformat}

 Exception thrown when running console producer with no port number for the 
 broker
 -

 Key: KAFKA-1618
 URL: https://issues.apache.org/jira/browse/KAFKA-1618
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1.1
Reporter: Gwen Shapira
Assignee: BalajiSeshadri
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, 
 KAFKA-1618-REBASED.patch, KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, 
 KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch


 When running console producer with just localhost as the broker list, I get 
 ArrayIndexOutOfBounds exception.
 I expect either a clearer error about arguments or for the producer to 
 guess a default port.
 [root@shapira-1 bin]# ./kafka-console-producer.sh  --topic rufus1 
 --broker-list localhost
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102)
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97)
   at 
 kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32)
   at 
 kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41)
   at kafka.producer.Producer.init(Producer.scala:59)
   at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158)
   at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26018: Patch for KAFKA-404

2014-09-25 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26018/#review54559
---



core/src/test/scala/unit/kafka/server/CreateZKChrootTest.scala
https://reviews.apache.org/r/26018/#comment94745

Thanks for adding this test. It's probably worth consolidating some of our 
tests on server startup and shutdown. Can we rename ServerShutdownTest to 
ServerStartupAndShutdownTest and move this test there?


- Neha Narkhede


On Sept. 25, 2014, 5:31 a.m., Jonathan Creasy wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26018/
 ---
 
 (Updated Sept. 25, 2014, 5:31 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-404
 https://issues.apache.org/jira/browse/KAFKA-404
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-404 auto-create Zookeeper CHROOT on Startup
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 390fef500d7e0027e698c259d777454ba5a0f5e8 
   core/src/test/scala/unit/kafka/server/CreateZKChrootTest.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/26018/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jonathan Creasy
 




[jira] [Commented] (KAFKA-1367) Broker topic metadata not kept in sync with ZooKeeper

2014-09-25 Thread Magnus Edenhill (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148054#comment-14148054
 ] 

Magnus Edenhill commented on KAFKA-1367:


[~guozhang] Yes, KAFKA-1555 looks like a good match.

 Broker topic metadata not kept in sync with ZooKeeper
 -

 Key: KAFKA-1367
 URL: https://issues.apache.org/jira/browse/KAFKA-1367
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0, 0.8.1
Reporter: Ryan Berdeen
  Labels: newbie++
 Attachments: KAFKA-1367.txt


 When a broker is restarted, the topic metadata responses from the brokers 
 will be incorrect (different from ZooKeeper) until a preferred replica leader 
 election.
 In the metadata, it looks like leaders are correctly removed from the ISR 
 when a broker disappears, but followers are not. Then, when a broker 
 reappears, the ISR is never updated.
 I used a variation of the Vagrant setup created by Joe Stein to reproduce 
 this with latest from the 0.8.1 branch: 
 https://github.com/also/kafka/commit/dba36a503a5e22ea039df0f9852560b4fb1e067c



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26018: Patch for KAFKA-404

2014-09-25 Thread Jonathan Creasy
I went and looked at the ServerShutdownTest to implement your suggestion,
and I don¹t think I like this particular consolidation.

How about I go and change this test to ServerStartupTest, and future
startup checks can go there and we leave the Shutdown checks separate?

On 9/25/14, 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote:


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26018/#review54559
---



core/src/test/scala/unit/kafka/server/CreateZKChrootTest.scala
https://reviews.apache.org/r/26018/#comment94745

Thanks for adding this test. It's probably worth consolidating some
of our tests on server startup and shutdown. Can we rename
ServerShutdownTest to ServerStartupAndShutdownTest and move this test
there?


- Neha Narkhede


On Sept. 25, 2014, 5:31 a.m., Jonathan Creasy wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26018/
 ---
 
 (Updated Sept. 25, 2014, 5:31 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-404
 https://issues.apache.org/jira/browse/KAFKA-404
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-404 auto-create Zookeeper CHROOT on Startup
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/KafkaServer.scala
390fef500d7e0027e698c259d777454ba5a0f5e8
   core/src/test/scala/unit/kafka/server/CreateZKChrootTest.scala
PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/26018/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jonathan Creasy
 





[jira] [Commented] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist

2014-09-25 Thread Jonathan Creasy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148057#comment-14148057
 ] 

Jonathan Creasy commented on KAFKA-404:
---

Updated reviewboard https://reviews.apache.org/r/26019/diff/
 against branch trunk

 When using chroot path, create chroot on startup if it doesn't exist
 

 Key: KAFKA-404
 URL: https://issues.apache.org/jira/browse/KAFKA-404
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux
Reporter: Jonathan Creasy
  Labels: newbie, patch
 Fix For: 0.8.2

 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, 
 KAFKA-404_2014-09-25_13:11:02.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist

2014-09-25 Thread Jonathan Creasy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Creasy updated KAFKA-404:
--
Attachment: KAFKA-404_2014-09-25_13:11:02.patch

 When using chroot path, create chroot on startup if it doesn't exist
 

 Key: KAFKA-404
 URL: https://issues.apache.org/jira/browse/KAFKA-404
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux
Reporter: Jonathan Creasy
  Labels: newbie, patch
 Fix For: 0.8.2

 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, 
 KAFKA-404_2014-09-25_13:11:02.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26019: Patch for KAFKA-404

2014-09-25 Thread Jonathan Creasy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26019/
---

(Updated Sept. 25, 2014, 6:11 p.m.)


Review request for kafka.


Bugs: KAFKA-404
https://issues.apache.org/jira/browse/KAFKA-404


Repository: kafka


Description
---

KAFKA-404 auto-create Zookeeper CHROOT on Startup


Diffs (updated)
-

  core/src/main/scala/kafka/server/KafkaServer.scala 
390fef500d7e0027e698c259d777454ba5a0f5e8 
  core/src/test/scala/unit/kafka/server/ServerStartupTest.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/26019/diff/


Testing
---

Ran Kafka against a brand new Zookeeper, chroot exists after Kafka starts.

Also: 

Added Unit test, fails prior to patch. 

$ ./gradlew -Dtest.single=CreateZKChrootTest core:test
Building project 'core' with Scala version 2.10.1
:clients:compileJava UP-TO-DATE
:clients:processResources UP-TO-DATE
:clients:classes UP-TO-DATE
:clients:jar UP-TO-DATE
:core:compileJava UP-TO-DATE
:core:compileScala UP-TO-DATE
:core:processResources UP-TO-DATE
:core:classes UP-TO-DATE
:core:compileTestJava UP-TO-DATE
:core:compileTestScala
:core:processTestResources UP-TO-DATE
:core:testClasses
:core:test

kafka.server.CreateZKChrootTest  testBrokerCreatesZKChroot FAILED
java.lang.IllegalArgumentException: Path length must be  0
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:766)
at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213)
at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
at kafka.utils.ZkUtils$.makeSurePersistentPathExists(ZkUtils.scala:211)
at 
kafka.utils.ZkUtils$$anonfun$setupCommonPaths$1.apply(ZkUtils.scala:91)
at 
kafka.utils.ZkUtils$$anonfun$setupCommonPaths$1.apply(ZkUtils.scala:90)
at scala.collection.immutable.List.foreach(List.scala:318)
at kafka.utils.ZkUtils$.setupCommonPaths(ZkUtils.scala:90)
at kafka.server.KafkaServer.initZk(KafkaServer.scala:133)
at kafka.server.KafkaServer.startup(KafkaServer.scala:81)
at kafka.utils.TestUtils$.createServer(TestUtils.scala:133)
at kafka.server.CreateZKChrootTest.setUp(CreateZKChrootTest.scala:40)

1 test completed, 1 failed
:core:test FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':core:test'.
 There were failing tests. See the report at: 
 file:///Users/jcreasy/code/kafka/core/build/reports/tests/index.html

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug 
option to get more log output.

BUILD FAILED

Total time: 27.926 secs

Passes once patch applied:

$ ./gradlew -Dtest.single=CreateZKChrootTest core:test
Building project 'core' with Scala version 2.10.1
:clients:compileJava UP-TO-DATE
:clients:processResources UP-TO-DATE
:clients:classes UP-TO-DATE
:clients:jar UP-TO-DATE
:core:compileJava UP-TO-DATE
:core:compileScala
/Users/jcreasy/code/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:160:
 a pure expression does nothing in statement position; you may be omitting 
necessary parentheses
ControllerStats.uncleanLeaderElectionRate
^
/Users/jcreasy/code/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:161:
 a pure expression does nothing in statement position; you may be omitting 
necessary parentheses
ControllerStats.leaderElectionTimer
^
two warnings found
:core:processResources UP-TO-DATE
:core:classes
:core:compileTestJava UP-TO-DATE
 Building 69%  :core:compileTestScala^Cjcreasy at C02MR0K3FD58 in 
 ~/code/kafka on KAFKA-404*


Thanks,

Jonathan Creasy



Can Fetch Request cause delay of Produce Request?

2014-09-25 Thread Vadim Chekan
Hi all,

I'm working on my own kafka client implementation and I noticed strange
situation.

Time(s) | Action
0.0MetadataRequest
0.0MetadataResponse
0.2OffsetRequest
0.2OffsetResponse
0.3FetchRequest(MaxWaitTime=20sec)
6.0ProduceRequest
31.0  FetchResponse
 notice 25sec gap!
31.0  ProduceResponse

What is important, it seems to me that Fetch Request with long poll will
block processing of all following requests for the duration of timeout,
given that there is no new data. But new data *are* flowing in and Produce
Request is waiting right behind the Fetch one in server side processing
queue.

If what I describe is correct then I see the following problems.
1. From client's point of view, if they have a listener and a sender, then
sending data should produce immediate event on listener.

2. If I want clean shutdown of my application within 5sec, but my listeners
are configured with 20sec timeout, than I am risking losing the data. My
application shutdown time now becomes at least as long as listener's
pooling time, which is unreasonable.

I do understand why it was implemented that way, probably because of the
specification saying

The server guarantees that on a single TCP connection, requests will be
processed in the order they are sent and responses will return in that
order as well

But to achieve proper functioning of the client, I need to allocate another
tcp connection for listeners now?

Also, I'm aware about new protocol proposal for the next version of Kafka,
would this issue be resolved there?

Thanks,
Vadim.

-- 
From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
explicitly specified


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-09-25 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148121#comment-14148121
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 9/25/14 6:42 PM:


HI [~jkreps],

I will work on the sample program. We are not setting reconnect.backoff.ms and 
retry.backoff.ms configuration so it would be default configuration.  Only 
thing I can tell you is that I have 4 Producer instances per JVM.  So this 
might amplify issue. 

Thanks,

Bhavesh 


was (Author: bmis13):
HI [~jkreps],

I will work on the sample program. We are not setting reconnect.backoff.ms and 
retry.backoff.ms configuration so it would be default configuration.  Only 
thing I can tell you is that I have 4 Producer instance per JVM.  So this might 
amplify issue. 

Thanks,

Bhavesh 

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Jun Rao

 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-09-25 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148121#comment-14148121
 ] 

Bhavesh Mistry commented on KAFKA-1642:
---

HI [~jkreps],

I will work on the sample program. We are not setting reconnect.backoff.ms and 
retry.backoff.ms configuration so it would be default configuration.  Only 
thing I can tell you is that I have 4 Producer instance per JVM.  So this might 
amplify issue. 

Thanks,

Bhavesh 

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Jun Rao

 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist

2014-09-25 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148140#comment-14148140
 ] 

Neha Narkhede commented on KAFKA-404:
-

Your suggestion about consolidation is also fine. 

 When using chroot path, create chroot on startup if it doesn't exist
 

 Key: KAFKA-404
 URL: https://issues.apache.org/jira/browse/KAFKA-404
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux
Reporter: Jonathan Creasy
  Labels: newbie, patch
 Fix For: 0.8.2

 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, 
 KAFKA-404_2014-09-25_13:11:02.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Can Fetch Request cause delay of Produce Request?

2014-09-25 Thread Neha Narkhede
The server guarantees that on a single TCP connection, requests will be
processed in the order they are sent and responses will return in that
order as well

But to achieve proper functioning of the client, I need to allocate another
tcp connection for listeners now?

Yes. The easiest way to send and receive is to use a producer client and a
consumer client, each of which will have it's own TCP connection to the
server.

On Thu, Sep 25, 2014 at 11:40 AM, Vadim Chekan kot.bege...@gmail.com
wrote:

 Hi all,

 I'm working on my own kafka client implementation and I noticed strange
 situation.

 Time(s) | Action
 0.0MetadataRequest
 0.0MetadataResponse
 0.2OffsetRequest
 0.2OffsetResponse
 0.3FetchRequest(MaxWaitTime=20sec)
 6.0ProduceRequest
 31.0  FetchResponse
  notice 25sec gap!
 31.0  ProduceResponse

 What is important, it seems to me that Fetch Request with long poll will
 block processing of all following requests for the duration of timeout,
 given that there is no new data. But new data *are* flowing in and Produce
 Request is waiting right behind the Fetch one in server side processing
 queue.

 If what I describe is correct then I see the following problems.
 1. From client's point of view, if they have a listener and a sender, then
 sending data should produce immediate event on listener.

 2. If I want clean shutdown of my application within 5sec, but my listeners
 are configured with 20sec timeout, than I am risking losing the data. My
 application shutdown time now becomes at least as long as listener's
 pooling time, which is unreasonable.

 I do understand why it was implemented that way, probably because of the
 specification saying
 
 The server guarantees that on a single TCP connection, requests will be
 processed in the order they are sent and responses will return in that
 order as well
 
 But to achieve proper functioning of the client, I need to allocate another
 tcp connection for listeners now?

 Also, I'm aware about new protocol proposal for the next version of Kafka,
 would this issue be resolved there?

 Thanks,
 Vadim.

 --
 From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
 explicitly specified



Re: Review Request 26020: Patch for KAFKA-1651

2014-09-25 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26020/#review54579
---

Ship it!


Ship It!

- Neha Narkhede


On Sept. 25, 2014, 5:50 a.m., Jonathan Creasy wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26020/
 ---
 
 (Updated Sept. 25, 2014, 5:50 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1651
 https://issues.apache.org/jira/browse/KAFKA-1651
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Removed some extra whitespace
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 390fef500d7e0027e698c259d777454ba5a0f5e8 
 
 Diff: https://reviews.apache.org/r/26020/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jonathan Creasy
 




[jira] [Commented] (KAFKA-1651) Removed some extra whitespace in KafkaServer.scala

2014-09-25 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148174#comment-14148174
 ] 

Neha Narkhede commented on KAFKA-1651:
--

[~jcreasy] How did you come across these whitespaces? Curious to know if there 
is something we can do in terms of Intellij/Eclipse auto formatting to prevent 
such whitespace issues.

 Removed some extra whitespace in KafkaServer.scala
 --

 Key: KAFKA-1651
 URL: https://issues.apache.org/jira/browse/KAFKA-1651
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jonathan Creasy
Priority: Trivial
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1651.patch, KAFKA-1651_2014-09-25_00:49:36.patch, 
 KAFKA-1651_2014-09-25_00:50:11.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1651) Removed some extra whitespace in KafkaServer.scala

2014-09-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1651:
-
Reviewer: Neha Narkhede

 Removed some extra whitespace in KafkaServer.scala
 --

 Key: KAFKA-1651
 URL: https://issues.apache.org/jira/browse/KAFKA-1651
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jonathan Creasy
Priority: Trivial
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1651.patch, KAFKA-1651_2014-09-25_00:49:36.patch, 
 KAFKA-1651_2014-09-25_00:50:11.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1651) Removed some extra whitespace in KafkaServer.scala

2014-09-25 Thread Jonathan Creasy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148185#comment-14148185
 ] 

Jonathan Creasy commented on KAFKA-1651:


I noticed them in the RB diff when I was working on KAFKA-404, there are quite 
a few throughout, I noticed some on 
https://reviews.apache.org/r/25886/diff/?expand=1 for example also. They are 
bright red blocks when show extra whitespace is on.

 Removed some extra whitespace in KafkaServer.scala
 --

 Key: KAFKA-1651
 URL: https://issues.apache.org/jira/browse/KAFKA-1651
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jonathan Creasy
Priority: Trivial
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1651.patch, KAFKA-1651_2014-09-25_00:49:36.patch, 
 KAFKA-1651_2014-09-25_00:50:11.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist

2014-09-25 Thread Jonathan Creasy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148189#comment-14148189
 ] 

Jonathan Creasy commented on KAFKA-404:
---

RB Updated, let's merge this! :) (it's only 2 years old...)

 When using chroot path, create chroot on startup if it doesn't exist
 

 Key: KAFKA-404
 URL: https://issues.apache.org/jira/browse/KAFKA-404
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux
Reporter: Jonathan Creasy
  Labels: newbie, patch
 Fix For: 0.8.2

 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, 
 KAFKA-404_2014-09-25_13:11:02.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26019: Patch for KAFKA-404

2014-09-25 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26019/#review54581
---

Ship it!


Ship It!

- Neha Narkhede


On Sept. 25, 2014, 6:11 p.m., Jonathan Creasy wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26019/
 ---
 
 (Updated Sept. 25, 2014, 6:11 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-404
 https://issues.apache.org/jira/browse/KAFKA-404
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-404 auto-create Zookeeper CHROOT on Startup
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 390fef500d7e0027e698c259d777454ba5a0f5e8 
   core/src/test/scala/unit/kafka/server/ServerStartupTest.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/26019/diff/
 
 
 Testing
 ---
 
 Ran Kafka against a brand new Zookeeper, chroot exists after Kafka starts.
 
 Also: 
 
 Added Unit test, fails prior to patch. 
 
 $ ./gradlew -Dtest.single=CreateZKChrootTest core:test
 Building project 'core' with Scala version 2.10.1
 :clients:compileJava UP-TO-DATE
 :clients:processResources UP-TO-DATE
 :clients:classes UP-TO-DATE
 :clients:jar UP-TO-DATE
 :core:compileJava UP-TO-DATE
 :core:compileScala UP-TO-DATE
 :core:processResources UP-TO-DATE
 :core:classes UP-TO-DATE
 :core:compileTestJava UP-TO-DATE
 :core:compileTestScala
 :core:processTestResources UP-TO-DATE
 :core:testClasses
 :core:test
 
 kafka.server.CreateZKChrootTest  testBrokerCreatesZKChroot FAILED
 java.lang.IllegalArgumentException: Path length must be  0
 at 
 org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:48)
 at 
 org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:35)
 at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:766)
 at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
 at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
 at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
 at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
 at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
 at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:213)
 at org.I0Itec.zkclient.ZkClient.createPersistent(ZkClient.java:223)
 at 
 kafka.utils.ZkUtils$.makeSurePersistentPathExists(ZkUtils.scala:211)
 at 
 kafka.utils.ZkUtils$$anonfun$setupCommonPaths$1.apply(ZkUtils.scala:91)
 at 
 kafka.utils.ZkUtils$$anonfun$setupCommonPaths$1.apply(ZkUtils.scala:90)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at kafka.utils.ZkUtils$.setupCommonPaths(ZkUtils.scala:90)
 at kafka.server.KafkaServer.initZk(KafkaServer.scala:133)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:81)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:133)
 at kafka.server.CreateZKChrootTest.setUp(CreateZKChrootTest.scala:40)
 
 1 test completed, 1 failed
 :core:test FAILED
 
 FAILURE: Build failed with an exception.
 
 * What went wrong:
 Execution failed for task ':core:test'.
  There were failing tests. See the report at: 
  file:///Users/jcreasy/code/kafka/core/build/reports/tests/index.html
 
 * Try:
 Run with --stacktrace option to get the stack trace. Run with --info or 
 --debug option to get more log output.
 
 BUILD FAILED
 
 Total time: 27.926 secs
 
 Passes once patch applied:
 
 $ ./gradlew -Dtest.single=CreateZKChrootTest core:test
 Building project 'core' with Scala version 2.10.1
 :clients:compileJava UP-TO-DATE
 :clients:processResources UP-TO-DATE
 :clients:classes UP-TO-DATE
 :clients:jar UP-TO-DATE
 :core:compileJava UP-TO-DATE
 :core:compileScala
 /Users/jcreasy/code/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:160:
  a pure expression does nothing in statement position; you may be omitting 
 necessary parentheses
 ControllerStats.uncleanLeaderElectionRate
 ^
 /Users/jcreasy/code/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:161:
  a pure expression does nothing in statement position; you may be omitting 
 necessary parentheses
 ControllerStats.leaderElectionTimer
 ^
 two warnings found
 :core:processResources UP-TO-DATE
 :core:classes
 :core:compileTestJava UP-TO-DATE
  Building 69%  :core:compileTestScala^Cjcreasy at C02MR0K3FD58 in 
  ~/code/kafka on KAFKA-404*
 
 
 Thanks,
 
 Jonathan Creasy
 




[jira] [Updated] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1555:

Attachment: KAFKA-1555.5.patch

Noticed that prev patch accidentally included my slightly modified 
build.gradle. Removed that.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability

2014-09-25 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25886/
---

(Updated Sept. 25, 2014, 7:41 p.m.)


Review request for kafka.


Changes
---

Removed accidental build.gradle


Repository: kafka


Description
---

KAFKA-1555: provide strong consistency with reasonable availability


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 
  core/src/main/scala/kafka/common/NotEnoughReplicasException.scala 
PRE-CREATION 
  core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
39f777b 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c 

Diff: https://reviews.apache.org/r/25886/diff/


Testing
---

With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, 
with 1,3 and 4 min.insync.replicas.
* min.insync.replicas=1 behaved normally (all writes succeeded as long as a 
broker was up)
* min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and 
one broker was down
* min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1

See notes about retry behavior in the JIRA.


Thanks,

Gwen Shapira



[jira] [Commented] (KAFKA-1419) cross build for scala 2.11

2014-09-25 Thread Jonathan Herriott (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148228#comment-14148228
 ] 

Jonathan Herriott commented on KAFKA-1419:
--

Hi,  I need to integrate the kafka 0.8.1 java libraries built for scala 2.11.1 
into a project.  It appears that neither the 0.8 nor the 0.8.1 branch have 
these patches applied.  Which patches above are necessary in order to get this 
working?

 cross build for scala 2.11
 --

 Key: KAFKA-1419
 URL: https://issues.apache.org/jira/browse/KAFKA-1419
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.8.1
Reporter: Scott Clasen
Assignee: Ivan Lyutov
Priority: Blocker
 Fix For: 0.8.2, 0.8.1.2

 Attachments: KAFKA-1419-scalaBinaryVersion.patch, 
 KAFKA-1419-scalaBinaryVersion.patch, KAFKA-1419.patch, KAFKA-1419.patch, 
 KAFKA-1419_2014-07-28_15:05:16.patch, KAFKA-1419_2014-07-29_15:13:43.patch, 
 KAFKA-1419_2014-08-04_14:43:26.patch, KAFKA-1419_2014-08-05_12:51:16.patch, 
 KAFKA-1419_2014-08-07_10:17:34.patch, KAFKA-1419_2014-08-07_10:52:18.patch, 
 KAFKA-1419_cross_build_for_scala_2_11_for_0_8_1_branch.patch


 Please publish builds for scala 2.11, hopefully just needs a small tweak to 
 the gradle conf?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1652) NoSuchElementException on server shutdown

2014-09-25 Thread Neha Narkhede (JIRA)
Neha Narkhede created KAFKA-1652:


 Summary: NoSuchElementException on server shutdown
 Key: KAFKA-1652
 URL: https://issues.apache.org/jira/browse/KAFKA-1652
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede


Observed the following exception on server shutdown. Happens pretty frequently 
though not on every shutdown. Ways to reproduce -
1. Checkout kafka from trunk
2. Run bin/kafka-server-start.sh config/server.properties
3. Ctrl C

{noformat}
2014-09-25 12:31:14,441] ERROR None.get (kafka.network.Processor)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at kafka.network.ConnectionQuotas.dec(SocketServer.scala:522)
at kafka.network.AbstractServerThread.close(SocketServer.scala:165)
at kafka.network.AbstractServerThread.close(SocketServer.scala:157)
at kafka.network.Processor.close(SocketServer.scala:372)
at kafka.network.AbstractServerThread.closeAll(SocketServer.scala:178)
at 
kafka.network.Processor$$anonfun$run$3.apply$mcV$sp(SocketServer.scala:362)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at 
kafka.network.AbstractServerThread.swallowError(SocketServer.scala:108)
at kafka.network.Processor.run(SocketServer.scala:362)
at java.lang.Thread.run(Thread.java:695)
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist

2014-09-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-404:

Resolution: Fixed
Status: Resolved  (was: Patch Available)

Thanks for the patches, pushed to trunk

 When using chroot path, create chroot on startup if it doesn't exist
 

 Key: KAFKA-404
 URL: https://issues.apache.org/jira/browse/KAFKA-404
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux
Reporter: Jonathan Creasy
Assignee: Jonathan Creasy
  Labels: newbie, patch
 Fix For: 0.8.2

 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, 
 KAFKA-404_2014-09-25_13:11:02.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist

2014-09-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede closed KAFKA-404.
---

 When using chroot path, create chroot on startup if it doesn't exist
 

 Key: KAFKA-404
 URL: https://issues.apache.org/jira/browse/KAFKA-404
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux
Reporter: Jonathan Creasy
Assignee: Jonathan Creasy
  Labels: newbie, patch
 Fix For: 0.8.2

 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, 
 KAFKA-404_2014-09-25_13:11:02.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-404) When using chroot path, create chroot on startup if it doesn't exist

2014-09-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-404:

Assignee: Jonathan Creasy

 When using chroot path, create chroot on startup if it doesn't exist
 

 Key: KAFKA-404
 URL: https://issues.apache.org/jira/browse/KAFKA-404
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
 Environment: CentOS 5.5, Linux 2.6.18-194.32.1.el5 x86_64 GNU/Linux
Reporter: Jonathan Creasy
Assignee: Jonathan Creasy
  Labels: newbie, patch
 Fix For: 0.8.2

 Attachments: KAFKA-404-0.7.1.patch, KAFKA-404-0.8.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-i.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v2.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v3.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v4.patch, 
 KAFKA-404-auto-create-zookeeper-chroot-on-start-up-v5.patch, KAFKA-404.patch, 
 KAFKA-404_2014-09-25_13:11:02.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1652) NoSuchElementException on server shutdown

2014-09-25 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148263#comment-14148263
 ] 

Joel Koshy commented on KAFKA-1652:
---

Are you sure you have the fix in KAFKA-1577 ? If so should we just reopen that?

 NoSuchElementException on server shutdown
 -

 Key: KAFKA-1652
 URL: https://issues.apache.org/jira/browse/KAFKA-1652
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
  Labels: newbie++

 Observed the following exception on server shutdown. Happens pretty 
 frequently though not on every shutdown. Ways to reproduce -
 1. Checkout kafka from trunk
 2. Run bin/kafka-server-start.sh config/server.properties
 3. Ctrl C
 {noformat}
 2014-09-25 12:31:14,441] ERROR None.get (kafka.network.Processor)
 java.util.NoSuchElementException: None.get
   at scala.None$.get(Option.scala:313)
   at scala.None$.get(Option.scala:311)
   at kafka.network.ConnectionQuotas.dec(SocketServer.scala:522)
   at kafka.network.AbstractServerThread.close(SocketServer.scala:165)
   at kafka.network.AbstractServerThread.close(SocketServer.scala:157)
   at kafka.network.Processor.close(SocketServer.scala:372)
   at kafka.network.AbstractServerThread.closeAll(SocketServer.scala:178)
   at 
 kafka.network.Processor$$anonfun$run$3.apply$mcV$sp(SocketServer.scala:362)
   at kafka.utils.Utils$.swallow(Utils.scala:172)
   at kafka.utils.Logging$class.swallowError(Logging.scala:106)
   at 
 kafka.network.AbstractServerThread.swallowError(SocketServer.scala:108)
   at kafka.network.Processor.run(SocketServer.scala:362)
   at java.lang.Thread.run(Thread.java:695)
 {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1653) Duplicate broker ids allowed in replica assignment

2014-09-25 Thread Ryan Berdeen (JIRA)
Ryan Berdeen created KAFKA-1653:
---

 Summary: Duplicate broker ids allowed in replica assignment
 Key: KAFKA-1653
 URL: https://issues.apache.org/jira/browse/KAFKA-1653
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen


The reassign partitions command and the controller do not ensure that all 
replicas for a partition are on different brokers. For example, you could set 
1,2,2 as the list of brokers for the replicas.

kafka-topics.sh --describe --under-replicated will list these partitions as 
under-replicated, but I can't see a reason why the controller should allow this 
state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts

2014-09-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1647:
-
Priority: Critical  (was: Major)

 Replication offset checkpoints (high water marks) can be lost on hard kills 
 and restarts
 

 Key: KAFKA-1647
 URL: https://issues.apache.org/jira/browse/KAFKA-1647
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Priority: Critical

 We ran into this scenario recently in a production environment. This can 
 happen when enough brokers in a cluster are taken down. i.e., a rolling 
 bounce done properly should not cause this issue. It can occur if all 
 replicas for any partition are taken down.
 Here is a sample scenario:
 * Cluster of three brokers: b0, b1, b2
 * Two partitions (of some topic) with replication factor two: p0, p1
 * Initial state:
 ** p0: leader = b0, ISR = {b0, b1}
 ** p1: leader = b1, ISR = {b0, b1}
 * Do a parallel hard-kill of all brokers
 * Bring up b2, so it is the new controller
 * b2 initializes its controller context and populates its leader/ISR cache 
 (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last 
 known leaders are b0 (for p0) and b1 (for p2)
 * Bring up b1
 * The controller's onBrokerStartup procedure initiates a replica state change 
 for all replicas on b1 to become online. As part of this replica state change 
 it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 
 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: 
 leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not 
 included in the leaders field because b0 is down.
 * On receiving the LeaderAndIsrRequest, b1's replica manager will 
 successfully make b2 the leader for p1 (and create the local replica object 
 corresponding to p1). It will however abort the become follower transition 
 for p0 because the designated leader b2 is offline. So it will not create the 
 local replica object for p0.
 * It will then start the high water mark checkpoint thread. Since only p1 has 
 a local replica object, only p1's high water mark will be checkpointed to 
 disk. p0's previously written checkpoint  if any will be lost.
 So in summary it seems we should always create the local replica object even 
 if the online transition does not happen.
 Possible symptoms of the above bug could be one or more of the following (we 
 saw 2 and 3):
 # Data loss; yes on a hard-kill data loss is expected, but this can actually 
 cause loss of nearly all data if the broker becomes follower, truncates, and 
 soon after happens to become leader.
 # High IO on brokers that lose their high water mark then subsequently (on a 
 successful become follower transition) truncate their log to zero and start 
 catching up from the beginning.
 # If the offsets topic is affected, then offsets can get reset. This is 
 because during an offset load we don't read past the high water mark. So if a 
 water mark is missing then we don't load anything (even if the offsets are 
 there in the log).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1631) ReplicationFactor and under-replicated partitions incorrect during reassignment

2014-09-25 Thread Ryan Berdeen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148304#comment-14148304
 ] 

Ryan Berdeen commented on KAFKA-1631:
-

Not reporting partitions being reassigned seems even worse--this would lead to 
false negatives! It also doesn't address the fact that replication factor is 
reported incorrectly.

It seems like the right solution would be to store the intended replication 
factor for the topic, and alert if the size of the ISR is less than this.

 ReplicationFactor and under-replicated partitions incorrect during 
 reassignment
 ---

 Key: KAFKA-1631
 URL: https://issues.apache.org/jira/browse/KAFKA-1631
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
  Labels: newbie

 We have a topic with a replication factor of 3. We monitor 
 UnderReplicatedPartitions as recommended by the documentation.
 During a partition reassignment, partitions being reassigned are reported as 
 under-replicated. Running a describe shows:
 {code}
 Topic:activity-wal-1PartitionCount:15   ReplicationFactor:5 
 Configs:
 Topic: activity-wal-1   Partition: 0Leader: 14  Replicas: 
 14,13,12,11,15Isr: 14,12,11,13
 Topic: activity-wal-1   Partition: 1Leader: 14  Replicas: 
 15,14,11  Isr: 14,11
 Topic: activity-wal-1   Partition: 2Leader: 11  Replicas: 
 11,15,12  Isr: 12,11,15
 ...
 {code}
 It looks like the displayed replication factor, 5, is simply the number of 
 replicas listed for the first partition, which includes both brokers in the 
 current list and those onto which the partition is being reassigned. 
 Partition 0 is also included in the list when using the 
 `--under-replicated-partitions` option, even though it is replicated to more 
 partitions than the true replication factor.
 During a reassignment, the under-replicated partitions metric is not usable, 
 meaning that actual under-replicated partitions can go unnoticed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts

2014-09-25 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148336#comment-14148336
 ] 

Joel Koshy commented on KAFKA-1647:
---

If b1 aborts the follower transition the high watermark checkpoint can be 
removed. It does not truncate all the data at that point. However, on a 
subsequent become-follower transition (if any and if successful) it will 
truncate the log (since on unknown HW we take HW as zero).

This shouldn't be too difficult to fix though.

 Replication offset checkpoints (high water marks) can be lost on hard kills 
 and restarts
 

 Key: KAFKA-1647
 URL: https://issues.apache.org/jira/browse/KAFKA-1647
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Priority: Critical

 We ran into this scenario recently in a production environment. This can 
 happen when enough brokers in a cluster are taken down. i.e., a rolling 
 bounce done properly should not cause this issue. It can occur if all 
 replicas for any partition are taken down.
 Here is a sample scenario:
 * Cluster of three brokers: b0, b1, b2
 * Two partitions (of some topic) with replication factor two: p0, p1
 * Initial state:
 ** p0: leader = b0, ISR = {b0, b1}
 ** p1: leader = b1, ISR = {b0, b1}
 * Do a parallel hard-kill of all brokers
 * Bring up b2, so it is the new controller
 * b2 initializes its controller context and populates its leader/ISR cache 
 (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last 
 known leaders are b0 (for p0) and b1 (for p2)
 * Bring up b1
 * The controller's onBrokerStartup procedure initiates a replica state change 
 for all replicas on b1 to become online. As part of this replica state change 
 it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 
 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: 
 leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not 
 included in the leaders field because b0 is down.
 * On receiving the LeaderAndIsrRequest, b1's replica manager will 
 successfully make b2 the leader for p1 (and create the local replica object 
 corresponding to p1). It will however abort the become follower transition 
 for p0 because the designated leader b2 is offline. So it will not create the 
 local replica object for p0.
 * It will then start the high water mark checkpoint thread. Since only p1 has 
 a local replica object, only p1's high water mark will be checkpointed to 
 disk. p0's previously written checkpoint  if any will be lost.
 So in summary it seems we should always create the local replica object even 
 if the online transition does not happen.
 Possible symptoms of the above bug could be one or more of the following (we 
 saw 2 and 3):
 # Data loss; yes on a hard-kill data loss is expected, but this can actually 
 cause loss of nearly all data if the broker becomes follower, truncates, and 
 soon after happens to become leader.
 # High IO on brokers that lose their high water mark then subsequently (on a 
 successful become follower transition) truncate their log to zero and start 
 catching up from the beginning.
 # If the offsets topic is affected, then offsets can get reset. This is 
 because during an offset load we don't read past the high water mark. So if a 
 water mark is missing then we don't load anything (even if the offsets are 
 there in the log).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts

2014-09-25 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1647:
--
Labels: newbie++  (was: )

 Replication offset checkpoints (high water marks) can be lost on hard kills 
 and restarts
 

 Key: KAFKA-1647
 URL: https://issues.apache.org/jira/browse/KAFKA-1647
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Priority: Critical
  Labels: newbie++

 We ran into this scenario recently in a production environment. This can 
 happen when enough brokers in a cluster are taken down. i.e., a rolling 
 bounce done properly should not cause this issue. It can occur if all 
 replicas for any partition are taken down.
 Here is a sample scenario:
 * Cluster of three brokers: b0, b1, b2
 * Two partitions (of some topic) with replication factor two: p0, p1
 * Initial state:
 ** p0: leader = b0, ISR = {b0, b1}
 ** p1: leader = b1, ISR = {b0, b1}
 * Do a parallel hard-kill of all brokers
 * Bring up b2, so it is the new controller
 * b2 initializes its controller context and populates its leader/ISR cache 
 (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last 
 known leaders are b0 (for p0) and b1 (for p2)
 * Bring up b1
 * The controller's onBrokerStartup procedure initiates a replica state change 
 for all replicas on b1 to become online. As part of this replica state change 
 it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 
 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: 
 leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not 
 included in the leaders field because b0 is down.
 * On receiving the LeaderAndIsrRequest, b1's replica manager will 
 successfully make b2 the leader for p1 (and create the local replica object 
 corresponding to p1). It will however abort the become follower transition 
 for p0 because the designated leader b2 is offline. So it will not create the 
 local replica object for p0.
 * It will then start the high water mark checkpoint thread. Since only p1 has 
 a local replica object, only p1's high water mark will be checkpointed to 
 disk. p0's previously written checkpoint  if any will be lost.
 So in summary it seems we should always create the local replica object even 
 if the online transition does not happen.
 Possible symptoms of the above bug could be one or more of the following (we 
 saw 2 and 3):
 # Data loss; yes on a hard-kill data loss is expected, but this can actually 
 cause loss of nearly all data if the broker becomes follower, truncates, and 
 soon after happens to become leader.
 # High IO on brokers that lose their high water mark then subsequently (on a 
 successful become follower transition) truncate their log to zero and start 
 catching up from the beginning.
 # If the offsets topic is affected, then offsets can get reset. This is 
 because during an offset load we don't read past the high water mark. So if a 
 water mark is missing then we don't load anything (even if the offsets are 
 there in the log).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148371#comment-14148371
 ] 

Jay Kreps commented on KAFKA-1555:
--

This is a good discussion, I am glad we are taking the time to think this 
through carefully. Let's aim for the right end state rather than optimizing for 
what is easiest to implement now (since these features never get removed and we 
end up spending a lot of time explaining them).

[~sriramsub] it sounds like you are not totally sold on min.isr. Let me try to 
summarize a few things people have said that I think are true and see if people 
can agree on them:

1. Unclean leader election is an orthogonal issue. Regardless of settings, 
choosing a leader that is not caught up means losing data. This option covers 
catastrophic recovery (i.e. no server with complete data exists). We can give 
finer control over whether unclean election is manual or automatic but I think 
you need to have this escape hatch for the case where the authoritative copy of 
the data is destroyed.

2. Specifying a min.isr does actually make sense. I think people have one of 
two cases in mind. In one case non-availability means data loss. This is likely 
the most common case. In this case even if you are down to your last replica 
you still want to perform the write because there is still some hope the data 
will not be lost and if you refuse the write the chance of loss is 100%. In 
another case non-availability can be tolerated because something upstream 
(perhaps the client or another system) can hold onto the data and retry later. 
In this case you want to be sure that when you accept a write it is safe. In 
this case refusing a write is okay but accepting a write and then losing it is 
much worse. It's true that it is very hard to reason about the right min.isr as 
that depends on the probability of failure over time. But this criticism is 
also true of replication factor (e.g. to know an appropriate replication factor 
to yield a particular probability of data loss you need to know the joint 
probability distribution over machine failures).

3. With regard to min.isr there are three issues: (1) what are the settings 
that actually make sense, (2) what is the best way to express these in the 
protocol, and (3) what is the best way to represent this in the client 
configuration. I think we need to start by agreeing on (1).

4. I believe people are actually in agreement that the following settings make 
sense:
a. acks = 0, min.isr=0
b. acks = 1, min.isr = 1
c. acks = -1, min.isr in {1, ..., N}
Conversely no other settings make sense. Does everyone agree on this? If so the 
question is really how to expose this to the user.

4. There were several proposals for how to express these options.
a. The current proposal is to have acks remain in the protocol with its 
original meaning and values but add a topic configuration controlling min.isr. 
I personally think this is a bit weird since both about the definition of 
success for the request so it makes sense to send them with the request.
b. Alternately we could add a new field in the produce request specifying the 
min.isr.
c. Alternately we could add a new field in the response returning the actual 
isr size. An advantage of this is that it allows the client to distinguish 
between write failed and write succeeded but not with enough replicas.
d. Finally we could overload acks = -2, -3, etc to mean acks = -1 and min.isr = 
2, 3, etc. This sounds insane, but maybe it actually isn't. Since not all 
combinations of acks and min.isr make sense, this does actually enumerate the 
sensible cases.

5. Regardless of what we do in (4) the configuration can be somewhat simpler. 
Probably just by having the user specify min.isr and erroring out if they 
combine min.isr and acks in a non-sensical way.

6. It isn't clear to me whether the right behavior is to fail fast when the 
min.isr likely won't be met (because the current isr  min) and not attempt the 
write at all or else to always do the write and then return an error if the 
min.isr isn't met. The later case means that retries, which are the sane thing 
to do when you get the error, will lead to potentially many duplicates. In fact 
in the common case where you want to kind of block and keep trying the write 
until durability can be guaranteed even if that takes a few minutes this might 
means thousands of duplicates.



 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, 

Re: [jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Robert Withers
Jay, thank you for the excellence. Regarding your point 6, I would just like to 
mention that with a partial function style interface facet for event listener 
registrations on top of FarRefs and RemotePromiseRefs, in ERights style 
distributed promises, it is easy to think about multiple registrations for 
different future scenarios, scheduled or failure. Even if undetected partition 
occurs, replays can be revoked after a previous attempt succeeds or other 
client defined strategies. I do not know if this is considered for 0.9.

Thank you,
- Rob

 On Sep 25, 2014, at 3:53 PM, Jay Kreps (JIRA) j...@apache.org wrote:
 
 
[ 
 https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148371#comment-14148371
  ] 
 
 Jay Kreps commented on KAFKA-1555:
 --
 
 This is a good discussion, I am glad we are taking the time to think this 
 through carefully. Let's aim for the right end state rather than optimizing 
 for what is easiest to implement now (since these features never get removed 
 and we end up spending a lot of time explaining them).
 
 [~sriramsub] it sounds like you are not totally sold on min.isr. Let me try 
 to summarize a few things people have said that I think are true and see if 
 people can agree on them:
 
 1. Unclean leader election is an orthogonal issue. Regardless of settings, 
 choosing a leader that is not caught up means losing data. This option covers 
 catastrophic recovery (i.e. no server with complete data exists). We can give 
 finer control over whether unclean election is manual or automatic but I 
 think you need to have this escape hatch for the case where the authoritative 
 copy of the data is destroyed.
 
 2. Specifying a min.isr does actually make sense. I think people have one of 
 two cases in mind. In one case non-availability means data loss. This is 
 likely the most common case. In this case even if you are down to your last 
 replica you still want to perform the write because there is still some hope 
 the data will not be lost and if you refuse the write the chance of loss is 
 100%. In another case non-availability can be tolerated because something 
 upstream (perhaps the client or another system) can hold onto the data and 
 retry later. In this case you want to be sure that when you accept a write it 
 is safe. In this case refusing a write is okay but accepting a write and then 
 losing it is much worse. It's true that it is very hard to reason about the 
 right min.isr as that depends on the probability of failure over time. But 
 this criticism is also true of replication factor (e.g. to know an 
 appropriate replication factor to yield a particular probability of data loss 
 you need to know the joint probability distribution over machine failures).
 
 3. With regard to min.isr there are three issues: (1) what are the settings 
 that actually make sense, (2) what is the best way to express these in the 
 protocol, and (3) what is the best way to represent this in the client 
 configuration. I think we need to start by agreeing on (1).
 
 4. I believe people are actually in agreement that the following settings 
 make sense:
 a. acks = 0, min.isr=0
 b. acks = 1, min.isr = 1
 c. acks = -1, min.isr in {1, ..., N}
 Conversely no other settings make sense. Does everyone agree on this? If so 
 the question is really how to expose this to the user.
 
 4. There were several proposals for how to express these options.
 a. The current proposal is to have acks remain in the protocol with its 
 original meaning and values but add a topic configuration controlling 
 min.isr. I personally think this is a bit weird since both about the 
 definition of success for the request so it makes sense to send them with the 
 request.
 b. Alternately we could add a new field in the produce request specifying the 
 min.isr.
 c. Alternately we could add a new field in the response returning the actual 
 isr size. An advantage of this is that it allows the client to distinguish 
 between write failed and write succeeded but not with enough replicas.
 d. Finally we could overload acks = -2, -3, etc to mean acks = -1 and min.isr 
 = 2, 3, etc. This sounds insane, but maybe it actually isn't. Since not all 
 combinations of acks and min.isr make sense, this does actually enumerate the 
 sensible cases.
 
 5. Regardless of what we do in (4) the configuration can be somewhat simpler. 
 Probably just by having the user specify min.isr and erroring out if they 
 combine min.isr and acks in a non-sensical way.
 
 6. It isn't clear to me whether the right behavior is to fail fast when the 
 min.isr likely won't be met (because the current isr  min) and not attempt 
 the write at all or else to always do the write and then return an error if 
 the min.isr isn't met. The later case means that retries, which are the sane 
 thing to do when you get the error, will lead to 

[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Sriram Subramanian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148407#comment-14148407
 ] 

Sriram Subramanian commented on KAFKA-1555:
---

Thank you for summarizing all the thoughts Jay. 

1. I had issues with how ack was designed initially with the min_isr config and 
it looks a lot better now with ack = 0, ack = 1 and ack = -1. I still think ack 
should be an enum explaining what it does rather than using -1 or any arbitrary 
integers. 
2. I don't see the value of min_isr if it does not prevent data loss under 
unclean leader election. If it was a clean leader election, we would always 
have one other replica that has the data and min_isr does not add any more 
value. It is completely possible to ensure there is no data loss with unclean 
leader election using the min_isr and I think that is the real benefit of it.
3. Has I had said previously, I like the sender to know what guarantees they 
get when they send the request and would opt for min_isr being exposed at the 
API level.
4. W.r.t your last point, I think it may not be possible to avoid duplicates by 
failing before writing to the log. The reason is that the isr could become less 
than min_isr just after the check and we could still end up failing the request 
after a timeout. Agreed, this is an edge case and we end up with a lot less 
duplicates. So I think, you would need the check in both places.  

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25944: Patch for KAFKA-1013

2014-09-25 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25944/#review54609
---



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94796

This comment line is for code line 320, better move it above it.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94797

This comment line is for code line 320, better move it above it.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94799

Is there a difference between these two:

Thread.sleep()

TimeUnit.MILLISECONDS.sleep()

since we do the former everywhere in the code base.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94800

Do we still need this variable?



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94802

Can we omit collection.Seq() here can just use groupId = config.groupId?



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94804

merge in a single line



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94806

revert



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25944/#comment94807

revert



core/src/main/scala/kafka/tools/ExportOffsets.scala
https://reviews.apache.org/r/25944/#comment94812

Can we actually make the same format for ZK / offsetmanager, making two 
different formats would make it harder to be parsed since the user needs to 
know whether ZK or offsetmanager is used.



core/src/main/scala/kafka/tools/ExportOffsets.scala
https://reviews.apache.org/r/25944/#comment94810

We can make a parseBrokerList in Utils and use it there, since I have seens 
this similar parsing logic at multiple places.



core/src/main/scala/kafka/tools/ExportOffsets.scala
https://reviews.apache.org/r/25944/#comment94811

You can take a look at KAFKA-686's latest patch which did some cleanup on 
the util functions; these function may probably merged into Utils.



core/src/main/scala/kafka/tools/ImportOffsets.scala
https://reviews.apache.org/r/25944/#comment94813

Ditto as above, can we unify the input offset format?



core/src/main/scala/kafka/tools/ImportOffsets.scala
https://reviews.apache.org/r/25944/#comment94814

Ditto as above.



core/src/main/scala/kafka/tools/ImportOffsets.scala
https://reviews.apache.org/r/25944/#comment94815

Same as Joel suggested: we can just use config's default values.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94816

The apache header is missing.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94819

This could be error.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94822

This can be trace, or we can just print the offset manager id in debug 
if it does not contain error code.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94820

Could be error(Error while connecting to %s:%d for fetching consumer 
metadata), since thi is not a general exception.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94821

When an exception is thrown and caught here, we should skip the rest of the 
loop.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94823

Could be error(Error while connecting to offset manager %s)



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94824

Some logging inconsistency: 

broker [%s:%d]

broker %s:%d

%s:%d



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94825

Why this API needs to return an Option while the previous one can directly 
return the reponse?



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94826

We do not need Possible cause any more, just print the exception's 
message if necessary.



core/src/main/scala/kafka/tools/OffsetClient.scala
https://reviews.apache.org/r/25944/#comment94827

Are there any other exceptions that can be caught besides IOExceptions? We 
need to be careful about always catching Throwable and printing stack trace.


- Guozhang Wang


On Sept. 23, 2014, 5:48 p.m., Mayuresh Gharat wrote:
 
 ---
 This is an 

[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148450#comment-14148450
 ] 

Gwen Shapira commented on KAFKA-1555:
-

Call me a control freak, but don't lose data is a critical requirement, and 
I'd like to control as much as possible of it in a central location that can be 
monitored and audited. I can't see how we can force everyone who produces data 
to do the right thing.



 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148483#comment-14148483
 ] 

Jay Kreps commented on KAFKA-1555:
--

1. Yeah this may have been a mistake. I think integers are fine for the 
protocol. We need not expose them in this way in the client config.
2. Yeah I think what I said wasn't very clear. You are correct that the loss 
situation is always an unclean election. I think your complaint is that min.isr 
is hard to reason about. I actually agree. What I was trying to say was that 
you can't totally prevent data loss either by replication or min.isr because 
regardless of the number of writes there is some chance of more failures than 
that. So fundamentally using the number of writes/replicas is a way to increase 
the probability of durability. Both are hard to reason about but I don't know 
if min.isr is worse than replication factor in that respect.
3. Which of the four options are you saying you like?
4. Yes, I totally agree. Let me elaborate. I claim there are really only two 
common cases here (a) you have the ability to block and wait for sufficient 
available replicas holding onto whatever unwritten data in the meantime, (b) 
you don't. I think probably the majority of uses are (b), for example any data 
produced by a web application would be this way. But there are plenty of cases 
where you can block (a stream processing job reading from an upstream topic, or 
when replicating data coming out of a database, etc). min.isr only helps the 
case where you can block. So the only sane way to use min.isr is to also set 
retries to infinity and keep trying until you are sure the write has succeeded. 
But if we don't fail fast on the write imagine how this would work in practice. 
A server fails bringing you below your min.isr setting and for the hour while 
someone is fixing it your process is sitting there pumping out duplicate 
writes. This is likely not what anyone would want. But as you say you can't 
guarantee the data wasn't written because the isr could shrink after the write 
occurred. This would be rare, but possible. However attempting to fail fast, 
even though it isn't perfect, fixes the problem issue--it is possible there may 
be a duplicate but that is true anyway just due to network timeouts, but there 
won't be a billion duplicates which is what I consider to be the blocker issue.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Can Fetch Request cause delay of Produce Request?

2014-09-25 Thread Vadim Chekan
Thank you Neha,

Allocating dedicated connection for fetching client worked fine and did not
require any changes to kafka client library code.

Thinking about it, server perhaps could process Fetch and Produce requests
in parallel, while still returning result in order as in the spec. What
needs to be prevented is multiple producers or multiple fetchers
simultaneous processing. I hope I'll have time to play with server a little
bit.

Vadim.

On Thu, Sep 25, 2014 at 12:13 PM, Neha Narkhede neha.narkh...@gmail.com
wrote:

 The server guarantees that on a single TCP connection, requests will be
 processed in the order they are sent and responses will return in that
 order as well
 
 But to achieve proper functioning of the client, I need to allocate another
 tcp connection for listeners now?

 Yes. The easiest way to send and receive is to use a producer client and a
 consumer client, each of which will have it's own TCP connection to the
 server.

 On Thu, Sep 25, 2014 at 11:40 AM, Vadim Chekan kot.bege...@gmail.com
 wrote:

  Hi all,
 
  I'm working on my own kafka client implementation and I noticed strange
  situation.
 
  Time(s) | Action
  0.0MetadataRequest
  0.0MetadataResponse
  0.2OffsetRequest
  0.2OffsetResponse
  0.3FetchRequest(MaxWaitTime=20sec)
  6.0ProduceRequest
  31.0  FetchResponse
   notice 25sec gap!
  31.0  ProduceResponse
 
  What is important, it seems to me that Fetch Request with long poll will
  block processing of all following requests for the duration of timeout,
  given that there is no new data. But new data *are* flowing in and
 Produce
  Request is waiting right behind the Fetch one in server side processing
  queue.
 
  If what I describe is correct then I see the following problems.
  1. From client's point of view, if they have a listener and a sender,
 then
  sending data should produce immediate event on listener.
 
  2. If I want clean shutdown of my application within 5sec, but my
 listeners
  are configured with 20sec timeout, than I am risking losing the data. My
  application shutdown time now becomes at least as long as listener's
  pooling time, which is unreasonable.
 
  I do understand why it was implemented that way, probably because of the
  specification saying
  
  The server guarantees that on a single TCP connection, requests will be
  processed in the order they are sent and responses will return in that
  order as well
  
  But to achieve proper functioning of the client, I need to allocate
 another
  tcp connection for listeners now?
 
  Also, I'm aware about new protocol proposal for the next version of
 Kafka,
  would this issue be resolved there?
 
  Thanks,
  Vadim.
 
  --
  From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
  explicitly specified
 




-- 
From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
explicitly specified


Re: Review Request 25995: Patch for KAFKA-1650

2014-09-25 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review54620
---



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/25995/#comment94830

Do we need to do this check every time in the loop?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94831

no need empty line here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94832

No need bracket



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94833

No need bracket



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94835

Maximum bytes that can be buffered in the data channels



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94834

in terms of bytes



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94836

Inconsistency indentation.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94838

Capitalize: Offset commit interval in ms



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94841

Do you need to turn off auto commit on the consumer threads here?



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94840

We can add some more comment here, explaning:

1) why we add the offset commit thread for new producer, but not old 
producer;

2) what risks does the old producer have (for not having offset commit 
thread).



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94842

For clean shutdown, you need to

1) halt consumer threads first.

2) wait for producer to drain all the messages in data channel.

3) manually commit offsets on consumer threads.

4) shut down consumer threads.

Otherwise we will have data duplicates as we commit offsets based on min.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94844

queueId



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94846

How about having a histogram for each queue instead of getting the sum? The 
update call would be a bit less expensive and we can monitor if some queues are 
empty while others get all the data.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94847

Ditto above.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94849

Add comments explaining why we force an unclean shutdown with System.exit 
here.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94857

Unfortunately this may not be the case, as we can have multiple connectors 
which are using different consumer configs with different group ids. We need to 
either 1) change the config settings to enforce this to be true, or 2) use a 
separate offset client that remembers which topics belongs to which groups.



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94858

Capitalize first word



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94859

Capitalize first word



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94863

Adding comment to the logic of how this works. Also a few questions:

1) is the map() call synchronized with other threads putting new offsets 
into the map?

2) after the sorting, the logic may be clearer as

val commitableOffsetIndex = 0
while (offsets[commitableOffsetIndex] - offsets.head == 
commitableOffsetIndex) commitableOffsetIndex += 1

offsetToCommit = offsets[commitableOffsetIndex] + 1



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/25995/#comment94855

The send().get() call is missing.



core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala
https://reviews.apache.org/r/25995/#comment94853

Apache header missing.


- Guozhang Wang


On Sept. 24, 2014, 4:26 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Sept. 24, 2014, 4:26 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 
 
 Repository: kafka
 
 
 Description
 

[jira] [Updated] (KAFKA-589) Clean shutdown after startup connection failure

2014-09-25 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-589:

Status: Patch Available  (was: Open)

This patch makes KafkaServer clean up after itself, but still rethrow any 
caught exceptions. This keeps the existing interface the same, should still 
work if the caller does cleanup themselves by catching exceptions and calling 
shutdown, but also cleans up if they don't so the leftover thread won't cause a 
hang. Also adds a test of this behavior.

 Clean shutdown after startup connection failure
 ---

 Key: KAFKA-589
 URL: https://issues.apache.org/jira/browse/KAFKA-589
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.0, 0.7.2
Reporter: Jason Rosenberg
Assignee: Swapnil Ghike
Priority: Minor
  Labels: bugs, newbie

 Hi,
 I'm embedding the kafka server (0.7.2) in an application container.   I've 
 noticed that if I try to start the server without zookeeper being available, 
 by default it gets a zk connection timeout after 6 seconds, and then throws 
 an Exception out of KafkaServer.startup()E.g., I see this stack trace:
 Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: 
 Unable to connect to zookeeper server within timeout: 6000
   at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
   at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98)
   at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84)
   at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44)
   at kafka.log.LogManager.init(LogManager.scala:93)
   at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
 
 
 So that's ok, I can catch the exception, and then shut everything down 
 gracefully, in this case.  However, when I do this, it seems there is a 
 daemon thread still around, which doesn't quit, and so the server never 
 actually exits the jvm.  Specifically, this thread seems to hang around:
 kafka-logcleaner-0 prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on 
 condition [112c07000]
java.lang.Thread.State: TIMED_WAITING (parking)
   at sun.misc.Unsafe.park(Native Method)
   - parking to wait for  7f40d4be8 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
   at 
 java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
   at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
   at 
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
   at 
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
   at 
 java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
   at java.lang.Thread.run(Thread.java:680)
 Looking at the code in kafka.log.LogManager(), it does seem like it starts up 
 the scheduler to clean logs, before then trying to connect to zk (and in this 
 case fail):
   /* Schedule the cleanup task to delete old logs */
   if(scheduler != null) {
 info(starting log cleaner every  + logCleanupIntervalMs +  ms)
 scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
   }
 So this scheduler does not appear to be stopped if startup fails.  However, 
 if I catch the above RuntimeException, and then call KafkaServer.shutdown(), 
 then it will stop the scheduler, and all is good.
 However, it seems odd that if I get an exception when calling 
 KafkaServer.startup(), that I should still have to do a 
 KafkaServer.shutdown().  Rather, wouldn't it be better to have it internally 
 cleanup after itself if startup() gets an exception?  I'm not sure I can 
 reliably call shutdown() after a failed startup()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148510#comment-14148510
 ] 

Jun Rao commented on KAFKA-1555:


a. I agree that unclean leader election is an orthogonal issue from min.isr. 
Unclean leader election only happens when all replicas in isr fail. This can 
happen independent of the value of min.isr. The higher the value of min.isr, 
the lower the probability that an unclean leader election will happen.

b. The issue with exposing min.isr in the produce config is that a producer can 
be used to send multiple topics. Since different topics can have different 
replication factor, it maybe difficult to configure a proper min.isr that works 
for all topics. We had the same issue with ack  1 before. However, that option 
will be removed in Gwen's patch. From this perspective, I think having min.isr 
as a topic level config makes sense.

c. I agree that it's better to add a separate check to prevent the message into 
the leader's log if the isr at that moment is less than min.isr.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-589) Clean shutdown after startup connection failure

2014-09-25 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-589:

Attachment: KAFKA-589-v1.patch

 Clean shutdown after startup connection failure
 ---

 Key: KAFKA-589
 URL: https://issues.apache.org/jira/browse/KAFKA-589
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7.2, 0.8.0
Reporter: Jason Rosenberg
Assignee: Swapnil Ghike
Priority: Minor
  Labels: bugs, newbie
 Attachments: KAFKA-589-v1.patch


 Hi,
 I'm embedding the kafka server (0.7.2) in an application container.   I've 
 noticed that if I try to start the server without zookeeper being available, 
 by default it gets a zk connection timeout after 6 seconds, and then throws 
 an Exception out of KafkaServer.startup()E.g., I see this stack trace:
 Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: 
 Unable to connect to zookeeper server within timeout: 6000
   at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
   at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98)
   at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84)
   at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44)
   at kafka.log.LogManager.init(LogManager.scala:93)
   at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
 
 
 So that's ok, I can catch the exception, and then shut everything down 
 gracefully, in this case.  However, when I do this, it seems there is a 
 daemon thread still around, which doesn't quit, and so the server never 
 actually exits the jvm.  Specifically, this thread seems to hang around:
 kafka-logcleaner-0 prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on 
 condition [112c07000]
java.lang.Thread.State: TIMED_WAITING (parking)
   at sun.misc.Unsafe.park(Native Method)
   - parking to wait for  7f40d4be8 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
   at 
 java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
   at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
   at 
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
   at 
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
   at 
 java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
   at java.lang.Thread.run(Thread.java:680)
 Looking at the code in kafka.log.LogManager(), it does seem like it starts up 
 the scheduler to clean logs, before then trying to connect to zk (and in this 
 case fail):
   /* Schedule the cleanup task to delete old logs */
   if(scheduler != null) {
 info(starting log cleaner every  + logCleanupIntervalMs +  ms)
 scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
   }
 So this scheduler does not appear to be stopped if startup fails.  However, 
 if I catch the above RuntimeException, and then call KafkaServer.shutdown(), 
 then it will stop the scheduler, and all is good.
 However, it seems odd that if I get an exception when calling 
 KafkaServer.startup(), that I should still have to do a 
 KafkaServer.shutdown().  Rather, wouldn't it be better to have it internally 
 cleanup after itself if startup() gets an exception?  I'm not sure I can 
 reliably call shutdown() after a failed startup()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Sriram Subramanian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148520#comment-14148520
 ] 

Sriram Subramanian commented on KAFKA-1555:
---

2. I agree. I think what min_isr helps in is to have a way to specify I don't 
want to loose my data as long as min_isr - 1 number of nodes are down. For 
example, if no_of_replicas=3, min_isr = 2 and ack=-1, we should not loose data 
as long as one node is down even when there is an unclean leader election. In 
this particular case, when the leader fails, it is expected that all replica 
nodes are up but could be out of the isr. Under such constraints it is 
definitely possible to prevent data loss (ignoring data loss due to system 
failures and data not flushed to disk) by making the node with the longest log 
(assuming we ensure they don't diverge) as the leader.
3. I prefer b or c. d is attractive since you could use just one variable to 
define your required guarantees but it is hard to understand at the API level.
4. I totally agree. The issue is ISR takes a while to reflect the actual 
reality. Assume we failed early before writing to the local log and did not 
have any checks after writing. Replicas go down. It would take a while for the 
isr to reflect that the replicas are not in the isr anymore. During this time, 
we would simply write the messages to the log and loose it later.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability

2014-09-25 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25886/#review54635
---


Thanks for the patch. Looks good overall. A couple of comments.

1. Let's add a check of min.isr in Partition.appendMessagesToLeader() so that 
the message is not added to the leader's log if isr is less than min.isr.
2. We talked about dropping the ack  1 support in the jira. However, I think 
that's easier done after kafka-1583 is completed. So, we don't have to do this 
change here. I will file a follow up jira.

- Jun Rao


On Sept. 25, 2014, 7:41 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25886/
 ---
 
 (Updated Sept. 25, 2014, 7:41 p.m.)
 
 
 Review request for kafka.
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1555: provide strong consistency with reasonable availability
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 
   core/src/main/scala/kafka/cluster/Partition.scala ff106b4 
   core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 
   core/src/main/scala/kafka/common/NotEnoughReplicasException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 
   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
 39f777b 
   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c 
 
 Diff: https://reviews.apache.org/r/25886/diff/
 
 
 Testing
 ---
 
 With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, 
 with 1,3 and 4 min.insync.replicas.
 * min.insync.replicas=1 behaved normally (all writes succeeded as long as a 
 broker was up)
 * min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and 
 one broker was down
 * min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1
 
 See notes about retry behavior in the JIRA.
 
 
 Thanks,
 
 Gwen Shapira
 




[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148542#comment-14148542
 ] 

Jun Rao commented on KAFKA-1555:


4b or 4d has the same issue that the value of min.isr or ack may not work with 
all topics that can be sent through the same producer instance. 4c has the 
issue that the broker can't reject messages if isr is less than min.isr. So, 
personally, I think 4a is still the best option.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1631) ReplicationFactor and under-replicated partitions incorrect during reassignment

2014-09-25 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1631:
-
Attachment: KAFKA-1631-v1.patch

This patch fixes things in a way that I think [~rberdeen] would find sensible. 
The issue was that the test for under-replication was comparing the current set 
of assigned replicas against the number of in sync replicas. But during the 
reassignment that isn't really correct because if you, e.g., move all replicas 
to different brokers then you'll have more than the target # of partitions.

The fix is to look up the active set of reassignments and, if one is active for 
the partition, use that reassignment info to determine the correct # of 
replicas; otherwise, we can fall back on the active set. Note that this does 
mean that reassignments that increase the number of replicas will show up as 
under-replicated, which I think may be the case [~nehanarkhede] was hoping to 
fix. It's arguable which approach is correct (i.e. should the new target # of 
replicas apply as soon as the reassignment is issued or once it's completed).

As for the replication factor being reported -- that is the number of currently 
assigned replicas for the first partition and has a number of issues.
1. It can be higher than the real target number of replicas as described 
above.
2. It's also not really correct to have it on the topic summary line since it 
varies by partition.
3. Finally, it's not even just the value for partition 0 because it's just 
using the head of a Map.
If we're ok with changing the output formatting here, I can clean that part up 
as well, maybe by adding ReplicationFactor to each partition line and making it 
use the value used when determining under-replication.

 ReplicationFactor and under-replicated partitions incorrect during 
 reassignment
 ---

 Key: KAFKA-1631
 URL: https://issues.apache.org/jira/browse/KAFKA-1631
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
  Labels: newbie
 Attachments: KAFKA-1631-v1.patch


 We have a topic with a replication factor of 3. We monitor 
 UnderReplicatedPartitions as recommended by the documentation.
 During a partition reassignment, partitions being reassigned are reported as 
 under-replicated. Running a describe shows:
 {code}
 Topic:activity-wal-1PartitionCount:15   ReplicationFactor:5 
 Configs:
 Topic: activity-wal-1   Partition: 0Leader: 14  Replicas: 
 14,13,12,11,15Isr: 14,12,11,13
 Topic: activity-wal-1   Partition: 1Leader: 14  Replicas: 
 15,14,11  Isr: 14,11
 Topic: activity-wal-1   Partition: 2Leader: 11  Replicas: 
 11,15,12  Isr: 12,11,15
 ...
 {code}
 It looks like the displayed replication factor, 5, is simply the number of 
 replicas listed for the first partition, which includes both brokers in the 
 current list and those onto which the partition is being reassigned. 
 Partition 0 is also included in the list when using the 
 `--under-replicated-partitions` option, even though it is replicated to more 
 partitions than the true replication factor.
 During a reassignment, the under-replicated partitions metric is not usable, 
 meaning that actual under-replicated partitions can go unnoticed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148557#comment-14148557
 ] 

Neha Narkhede commented on KAFKA-1555:
--

Followed the discussion and want to share some thoughts-
1. With the new setting, it is necessary to get rid of the current semantics of 
acks  1. 
2. Given 1, initially I liked the approach of overriding acks  1 to be the 
semantics provided by what's described for min.isr in this JIRA. This is 
actually more intuitive compared to -2 .. -N. A downside of this approach, 
though, is that clients would need to know the replication factor of the topic 
which is known to the admin and also changeable only by the admin. So, probably 
moving it to be a topic level config that is also changeable by the admin makes 
sense.
3. Since we are adding a topic level config for the min.isr setting, it makes a 
lot of sense to express acks to be an enum. I think users would find it much 
more intuitive if the acks config was expressed as {no_ack, leader_ack, 
committed_ack} 
4. It is important to reject the write on the server when the requested 
min.acks is greater than the size of the current ISR. It is true that the ISR 
size could change immediately after rejecting the write, but it would lead to 
far fewer duplicates.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1631) ReplicationFactor and under-replicated partitions incorrect during reassignment

2014-09-25 Thread Ryan Berdeen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148559#comment-14148559
 ] 

Ryan Berdeen commented on KAFKA-1631:
-

The patch does look like an improvement to the {{TopicCommand}}, but doesn't 
address the number of under-replicated partitions reported by the brokers. It 
seems like there shouldn't be multiple definitions of under-replicated 
partition.

 ReplicationFactor and under-replicated partitions incorrect during 
 reassignment
 ---

 Key: KAFKA-1631
 URL: https://issues.apache.org/jira/browse/KAFKA-1631
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
  Labels: newbie
 Attachments: KAFKA-1631-v1.patch


 We have a topic with a replication factor of 3. We monitor 
 UnderReplicatedPartitions as recommended by the documentation.
 During a partition reassignment, partitions being reassigned are reported as 
 under-replicated. Running a describe shows:
 {code}
 Topic:activity-wal-1PartitionCount:15   ReplicationFactor:5 
 Configs:
 Topic: activity-wal-1   Partition: 0Leader: 14  Replicas: 
 14,13,12,11,15Isr: 14,12,11,13
 Topic: activity-wal-1   Partition: 1Leader: 14  Replicas: 
 15,14,11  Isr: 14,11
 Topic: activity-wal-1   Partition: 2Leader: 11  Replicas: 
 11,15,12  Isr: 12,11,15
 ...
 {code}
 It looks like the displayed replication factor, 5, is simply the number of 
 replicas listed for the first partition, which includes both brokers in the 
 current list and those onto which the partition is being reassigned. 
 Partition 0 is also included in the list when using the 
 `--under-replicated-partitions` option, even though it is replicated to more 
 partitions than the true replication factor.
 During a reassignment, the under-replicated partitions metric is not usable, 
 meaning that actual under-replicated partitions can go unnoticed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25995: Patch for KAFKA-1650

2014-09-25 Thread Jiangjie Qin


 On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
  299
  https://reviews.apache.org/r/25995/diff/1/?file=704523#file704523line299
 
  Do we need to do this check every time in the loop?

Maybe we can put this check out of while loop but that would probably introduce 
more duplicate code. Since the offset commit is not that frequent and the retry 
is hopefully rare, it should not create much overhead.


 On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 168
  https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line168
 
  Do you need to turn off auto commit on the consumer threads here?

I thought offset commit should be turned off in the consumer config. Is that 
the case?


 On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 207
  https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line207
 
  For clean shutdown, you need to
  
  1) halt consumer threads first.
  
  2) wait for producer to drain all the messages in data channel.
  
  3) manually commit offsets on consumer threads.
  
  4) shut down consumer threads.
  
  Otherwise we will have data duplicates as we commit offsets based on 
  min.

Talked to Guozhang, changed the process to be as below:
1. shutdown consumer threads.
2. shutdown producer
3. commit offsets
4. shutdown consumer connectors


 On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 447
  https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line447
 
  Adding comment to the logic of how this works. Also a few questions:
  
  1) is the map() call synchronized with other threads putting new 
  offsets into the map?
  
  2) after the sorting, the logic may be clearer as
  
  val commitableOffsetIndex = 0
  while (offsets[commitableOffsetIndex] - offsets.head == 
  commitableOffsetIndex) commitableOffsetIndex += 1
  
  offsetToCommit = offsets[commitableOffsetIndex] + 1

We are using a concurrent map, that guarantees a single put/get operation is 
atomic. Although its possible that the offset we get for different partitions 
might reflects different time point's value. But it should not matter that much 
because later commits will get updated value. And the offset we commit when 
exiting is guaranteed to be after the producer is shutdown. So I think the 
commits during running time does not needs to be 100% accurate.


 On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/MirrorMaker.scala, line 483
  https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line483
 
  The send().get() call is missing.

I put it in side the put().


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review54620
---


On Sept. 24, 2014, 4:26 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25995/
 ---
 
 (Updated Sept. 24, 2014, 4:26 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1650
 https://issues.apache.org/jira/browse/KAFKA-1650
 
 
 Repository: kafka
 
 
 Description
 ---
 
 mirror maker redesign; adding byte bounded blocking queue.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 fbc680fde21b02f11285a4f4b442987356abd17b 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 b8698ee1469c8fbc92ccc176d916eb3e28b87867 
   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/25995/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability

2014-09-25 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25886/#review54639
---


I suggest we also make the changes required to reject the write if min.isr  
size of current isr.


clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
https://reviews.apache.org/r/25886/#comment94876

Could you please add a description to this exception class that explains 
the purpose of the exception?



core/src/main/scala/kafka/cluster/Partition.scala
https://reviews.apache.org/r/25886/#comment94878

can you remove the extra whitespace in requiredOffset )?



core/src/main/scala/kafka/common/NotEnoughReplicasException.scala
https://reviews.apache.org/r/25886/#comment94879

Same here. Could you please add a description?


- Neha Narkhede


On Sept. 25, 2014, 7:41 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25886/
 ---
 
 (Updated Sept. 25, 2014, 7:41 p.m.)
 
 
 Review request for kafka.
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1555: provide strong consistency with reasonable availability
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 
   core/src/main/scala/kafka/cluster/Partition.scala ff106b4 
   core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 
   core/src/main/scala/kafka/common/NotEnoughReplicasException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 
   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
 39f777b 
   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c 
 
 Diff: https://reviews.apache.org/r/25886/diff/
 
 
 Testing
 ---
 
 With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, 
 with 1,3 and 4 min.insync.replicas.
 * min.insync.replicas=1 behaved normally (all writes succeeded as long as a 
 broker was up)
 * min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and 
 one broker was down
 * min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1
 
 See notes about retry behavior in the JIRA.
 
 
 Thanks,
 
 Gwen Shapira
 




[jira] [Commented] (KAFKA-1499) Broker-side compression configuration

2014-09-25 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148598#comment-14148598
 ] 

Joel Koshy commented on KAFKA-1499:
---

Thanks - it works now.

Sorry I had forgotten to run the unit tests. The DynamicConfigChangeTest fails 
- however, we probably should not simply add the broker.compression.enable 
property to the valid list since it is not really a valid topic-level config. 
We can follow-up tomorrow on this.


 Broker-side compression configuration
 -

 Key: KAFKA-1499
 URL: https://issues.apache.org/jira/browse/KAFKA-1499
 Project: Kafka
  Issue Type: New Feature
Reporter: Joel Koshy
Assignee: Manikumar Reddy
  Labels: newbie++
 Fix For: 0.8.2

 Attachments: KAFKA-1499.patch, KAFKA-1499.patch, 
 KAFKA-1499_2014-08-15_14:20:27.patch, KAFKA-1499_2014-08-21_21:44:27.patch, 
 KAFKA-1499_2014-09-21_15:57:23.patch, KAFKA-1499_2014-09-23_14:45:38.patch, 
 KAFKA-1499_2014-09-24_14:20:33.patch, KAFKA-1499_2014-09-24_14:24:54.patch, 
 KAFKA-1499_2014-09-25_11:05:57.patch

   Original Estimate: 72h
  Remaining Estimate: 72h

 A given topic can have messages in mixed compression codecs. i.e., it can
 also have a mix of uncompressed/compressed messages.
 It will be useful to support a broker-side configuration to recompress
 messages to a specific compression codec. i.e., all messages (for all
 topics) on the broker will be compressed to this codec. We could have
 per-topic overrides as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25944: Patch for KAFKA-1013

2014-09-25 Thread Mayuresh Gharat


 On Sept. 25, 2014, 10:43 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
  374
  https://reviews.apache.org/r/25944/diff/3/?file=703083#file703083line374
 
  Is there a difference between these two:
  
  Thread.sleep()
  
  TimeUnit.MILLISECONDS.sleep()
  
  since we do the former everywhere in the code base.

Its the same thing. Just another way of doing it. Internally it does the same 
thing.


 On Sept. 25, 2014, 10:43 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/ExportOffsets.scala, line 33
  https://reviews.apache.org/r/25944/diff/3/?file=703085#file703085line33
 
  Can we actually make the same format for ZK / offsetmanager, making two 
  different formats would make it harder to be parsed since the user needs to 
  know whether ZK or offsetmanager is used.

Yeah, the offset manager format is simpler. So unless we are planning to fetch 
offsets from ZK and commit to OffsetManager this should work. what do you think?


 On Sept. 25, 2014, 10:43 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/ExportOffsets.scala, line 199
  https://reviews.apache.org/r/25944/diff/3/?file=703085#file703085line199
 
  You can take a look at KAFKA-686's latest patch which did some cleanup 
  on the util functions; these function may probably merged into Utils.

Would like to discuss more on this!


- Mayuresh


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25944/#review54609
---


On Sept. 23, 2014, 5:48 p.m., Mayuresh Gharat wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25944/
 ---
 
 (Updated Sept. 23, 2014, 5:48 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1013
 https://issues.apache.org/jira/browse/KAFKA-1013
 
 
 Repository: kafka
 
 
 Description
 ---
 
 OffsetCLient Tool API. ImportZkOffsets and ExportZkOffsets replaced by 
 ImportOffsets and ExportOffsets
 
 
 Modified the comments in the headers
 
 
 Corrected a value
 
 
 Diffs
 -
 
   config/consumer.properties 83847de30d10b6e78bb8de28e0bb925d7c0e6ca2 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 fbc680fde21b02f11285a4f4b442987356abd17b 
   core/src/main/scala/kafka/tools/ConfigConstants.scala PRE-CREATION 
   core/src/main/scala/kafka/tools/ExportOffsets.scala PRE-CREATION 
   core/src/main/scala/kafka/tools/ImportOffsets.scala PRE-CREATION 
   core/src/main/scala/kafka/tools/OffsetClient.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/25944/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Mayuresh Gharat
 




[jira] [Commented] (KAFKA-1631) ReplicationFactor and under-replicated partitions incorrect during reassignment

2014-09-25 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148680#comment-14148680
 ] 

Ewen Cheslack-Postava commented on KAFKA-1631:
--

Right. Unfortunately most of the system isn't aware of the large scale change 
(reassign old set - new set), only of each intermediate state (old set - old 
set + new set - new set). As it stands, the UnderReplicatedPartitions are 
computed by Partition class, which is created by ReplicaManager. But the 
high-level reassignment is managed by KafkaController, and looks like the only 
place the necessary state is maintained. I think getting the semantics you want 
may require a much more substantial change since each partition leader will 
need to know about the partition reassignment rather than just the controller.

On the other hand, while I think it's less than ideal, the current behavior 
could certainly be argued to be reasonable -- i.e. that reassignment is not 
natively supported, it's just a higher-level operation you can build up. In 
this case, the intermediate step is expected, and the temporary reporting of 
under-replication would make sense since for a time the desired replication of 
(old set + new set) has not been achieved.

 ReplicationFactor and under-replicated partitions incorrect during 
 reassignment
 ---

 Key: KAFKA-1631
 URL: https://issues.apache.org/jira/browse/KAFKA-1631
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
  Labels: newbie
 Attachments: KAFKA-1631-v1.patch


 We have a topic with a replication factor of 3. We monitor 
 UnderReplicatedPartitions as recommended by the documentation.
 During a partition reassignment, partitions being reassigned are reported as 
 under-replicated. Running a describe shows:
 {code}
 Topic:activity-wal-1PartitionCount:15   ReplicationFactor:5 
 Configs:
 Topic: activity-wal-1   Partition: 0Leader: 14  Replicas: 
 14,13,12,11,15Isr: 14,12,11,13
 Topic: activity-wal-1   Partition: 1Leader: 14  Replicas: 
 15,14,11  Isr: 14,11
 Topic: activity-wal-1   Partition: 2Leader: 11  Replicas: 
 11,15,12  Isr: 12,11,15
 ...
 {code}
 It looks like the displayed replication factor, 5, is simply the number of 
 replicas listed for the first partition, which includes both brokers in the 
 current list and those onto which the partition is being reassigned. 
 Partition 0 is also included in the list when using the 
 `--under-replicated-partitions` option, even though it is replicated to more 
 partitions than the true replication factor.
 During a reassignment, the under-replicated partitions metric is not usable, 
 meaning that actual under-replicated partitions can go unnoticed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Reopened] (KAFKA-1577) Exception in ConnectionQuotas while shutting down

2014-09-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede reopened KAFKA-1577:
--

See KAFKA-1652

 Exception in ConnectionQuotas while shutting down
 -

 Key: KAFKA-1577
 URL: https://issues.apache.org/jira/browse/KAFKA-1577
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
  Labels: newbie
 Attachments: KAFKA-1577.patch, KAFKA-1577_2014-08-20_19:57:44.patch, 
 KAFKA-1577_2014-08-26_07:33:13.patch, 
 KAFKA-1577_check_counter_before_decrementing.patch


 {code}
 [2014-08-07 19:38:08,228] ERROR Uncaught exception in thread 
 'kafka-network-thread-9092-0': (kafka.utils.Utils$)
 java.util.NoSuchElementException: None.get
 at scala.None$.get(Option.scala:185)
 at scala.None$.get(Option.scala:183)
 at kafka.network.ConnectionQuotas.dec(SocketServer.scala:471)
 at kafka.network.AbstractServerThread.close(SocketServer.scala:158)
 at kafka.network.AbstractServerThread.close(SocketServer.scala:150)
 at kafka.network.AbstractServerThread.closeAll(SocketServer.scala:171)
 at kafka.network.Processor.run(SocketServer.scala:338)
 at java.lang.Thread.run(Thread.java:662)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1652) NoSuchElementException on server shutdown

2014-09-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede resolved KAFKA-1652.
--
Resolution: Duplicate

 NoSuchElementException on server shutdown
 -

 Key: KAFKA-1652
 URL: https://issues.apache.org/jira/browse/KAFKA-1652
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
  Labels: newbie++

 Observed the following exception on server shutdown. Happens pretty 
 frequently though not on every shutdown. Ways to reproduce -
 1. Checkout kafka from trunk
 2. Run bin/kafka-server-start.sh config/server.properties
 3. Ctrl C
 {noformat}
 2014-09-25 12:31:14,441] ERROR None.get (kafka.network.Processor)
 java.util.NoSuchElementException: None.get
   at scala.None$.get(Option.scala:313)
   at scala.None$.get(Option.scala:311)
   at kafka.network.ConnectionQuotas.dec(SocketServer.scala:522)
   at kafka.network.AbstractServerThread.close(SocketServer.scala:165)
   at kafka.network.AbstractServerThread.close(SocketServer.scala:157)
   at kafka.network.Processor.close(SocketServer.scala:372)
   at kafka.network.AbstractServerThread.closeAll(SocketServer.scala:178)
   at 
 kafka.network.Processor$$anonfun$run$3.apply$mcV$sp(SocketServer.scala:362)
   at kafka.utils.Utils$.swallow(Utils.scala:172)
   at kafka.utils.Logging$class.swallowError(Logging.scala:106)
   at 
 kafka.network.AbstractServerThread.swallowError(SocketServer.scala:108)
   at kafka.network.Processor.run(SocketServer.scala:362)
   at java.lang.Thread.run(Thread.java:695)
 {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1653) Duplicate broker ids allowed in replica assignment

2014-09-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1653:
-
Labels: newbie  (was: )

 Duplicate broker ids allowed in replica assignment
 --

 Key: KAFKA-1653
 URL: https://issues.apache.org/jira/browse/KAFKA-1653
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
  Labels: newbie

 The reassign partitions command and the controller do not ensure that all 
 replicas for a partition are on different brokers. For example, you could set 
 1,2,2 as the list of brokers for the replicas.
 kafka-topics.sh --describe --under-replicated will list these partitions as 
 under-replicated, but I can't see a reason why the controller should allow 
 this state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts

2014-09-25 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148702#comment-14148702
 ] 

Neha Narkhede commented on KAFKA-1647:
--

I see. So for this to lose all the data for the concerned partition, b1 has to 
be bounced to become a follower again and at that time b0 should be up for the 
become follower to succeed. Right? It is a pretty corner case, but should be 
fixed. 

 Replication offset checkpoints (high water marks) can be lost on hard kills 
 and restarts
 

 Key: KAFKA-1647
 URL: https://issues.apache.org/jira/browse/KAFKA-1647
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Priority: Critical
  Labels: newbie++

 We ran into this scenario recently in a production environment. This can 
 happen when enough brokers in a cluster are taken down. i.e., a rolling 
 bounce done properly should not cause this issue. It can occur if all 
 replicas for any partition are taken down.
 Here is a sample scenario:
 * Cluster of three brokers: b0, b1, b2
 * Two partitions (of some topic) with replication factor two: p0, p1
 * Initial state:
 ** p0: leader = b0, ISR = {b0, b1}
 ** p1: leader = b1, ISR = {b0, b1}
 * Do a parallel hard-kill of all brokers
 * Bring up b2, so it is the new controller
 * b2 initializes its controller context and populates its leader/ISR cache 
 (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last 
 known leaders are b0 (for p0) and b1 (for p2)
 * Bring up b1
 * The controller's onBrokerStartup procedure initiates a replica state change 
 for all replicas on b1 to become online. As part of this replica state change 
 it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 
 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: 
 leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not 
 included in the leaders field because b0 is down.
 * On receiving the LeaderAndIsrRequest, b1's replica manager will 
 successfully make b2 the leader for p1 (and create the local replica object 
 corresponding to p1). It will however abort the become follower transition 
 for p0 because the designated leader b2 is offline. So it will not create the 
 local replica object for p0.
 * It will then start the high water mark checkpoint thread. Since only p1 has 
 a local replica object, only p1's high water mark will be checkpointed to 
 disk. p0's previously written checkpoint  if any will be lost.
 So in summary it seems we should always create the local replica object even 
 if the online transition does not happen.
 Possible symptoms of the above bug could be one or more of the following (we 
 saw 2 and 3):
 # Data loss; yes on a hard-kill data loss is expected, but this can actually 
 cause loss of nearly all data if the broker becomes follower, truncates, and 
 soon after happens to become leader.
 # High IO on brokers that lose their high water mark then subsequently (on a 
 successful become follower transition) truncate their log to zero and start 
 catching up from the beginning.
 # If the offsets topic is affected, then offsets can get reset. This is 
 because during an offset load we don't read past the high water mark. So if a 
 water mark is missing then we don't load anything (even if the offsets are 
 there in the log).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: Kafka-trunk #277

2014-09-25 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/277/changes

Changes:

[neha.narkhede] KAFKA-404 When using chroot path, create chroot on startup if 
it doesn't exist; reviewed by Neha Narkhede

--
Started by an SCM change
Building remotely on ubuntu3 (Ubuntu ubuntu) in workspace 
https://builds.apache.org/job/Kafka-trunk/ws/
  git rev-parse --is-inside-work-tree
Fetching changes from the remote Git repository
  git config remote.origin.url 
  https://git-wip-us.apache.org/repos/asf/kafka.git
Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git
  git --version
  git fetch --tags --progress 
  https://git-wip-us.apache.org/repos/asf/kafka.git 
  +refs/heads/*:refs/remotes/origin/*
  git rev-parse origin/trunk^{commit}
Checking out Revision f750dba65f9d9552a61a0754c46fa6e294785b31 (origin/trunk)
  git config core.sparsecheckout
  git checkout -f f750dba65f9d9552a61a0754c46fa6e294785b31
  git rev-list 084566b837ee2204b6898b82156e811d0601085f
[Kafka-trunk] $ /bin/bash -xe /tmp/hudson8706236251140632788.sh
+ gradle
/tmp/hudson8706236251140632788.sh: line 2: gradle: command not found
Build step 'Execute shell' marked build as failure


[jira] [Commented] (KAFKA-1499) Broker-side compression configuration

2014-09-25 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148718#comment-14148718
 ] 

Manikumar Reddy commented on KAFKA-1499:


Yes, broker.compression.enable property  does not belongs to LogConfig. 
This config is available in kafkaConfig. KafkaConfig is not used in Log.scala.

Can I modify the constructor of LogManager, Log.scala classes to pass 
broker.compression.enable property.

 Broker-side compression configuration
 -

 Key: KAFKA-1499
 URL: https://issues.apache.org/jira/browse/KAFKA-1499
 Project: Kafka
  Issue Type: New Feature
Reporter: Joel Koshy
Assignee: Manikumar Reddy
  Labels: newbie++
 Fix For: 0.8.2

 Attachments: KAFKA-1499.patch, KAFKA-1499.patch, 
 KAFKA-1499_2014-08-15_14:20:27.patch, KAFKA-1499_2014-08-21_21:44:27.patch, 
 KAFKA-1499_2014-09-21_15:57:23.patch, KAFKA-1499_2014-09-23_14:45:38.patch, 
 KAFKA-1499_2014-09-24_14:20:33.patch, KAFKA-1499_2014-09-24_14:24:54.patch, 
 KAFKA-1499_2014-09-25_11:05:57.patch

   Original Estimate: 72h
  Remaining Estimate: 72h

 A given topic can have messages in mixed compression codecs. i.e., it can
 also have a mix of uncompressed/compressed messages.
 It will be useful to support a broker-side configuration to recompress
 messages to a specific compression codec. i.e., all messages (for all
 topics) on the broker will be compressed to this codec. We could have
 per-topic overrides as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-589) Clean shutdown after startup connection failure

2014-09-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-589:

Resolution: Fixed
Status: Resolved  (was: Patch Available)

Pushed to trunk

 Clean shutdown after startup connection failure
 ---

 Key: KAFKA-589
 URL: https://issues.apache.org/jira/browse/KAFKA-589
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7.2, 0.8.0
Reporter: Jason Rosenberg
Assignee: Swapnil Ghike
Priority: Minor
  Labels: bugs, newbie
 Attachments: KAFKA-589-v1.patch


 Hi,
 I'm embedding the kafka server (0.7.2) in an application container.   I've 
 noticed that if I try to start the server without zookeeper being available, 
 by default it gets a zk connection timeout after 6 seconds, and then throws 
 an Exception out of KafkaServer.startup()E.g., I see this stack trace:
 Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: 
 Unable to connect to zookeeper server within timeout: 6000
   at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
   at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98)
   at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84)
   at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44)
   at kafka.log.LogManager.init(LogManager.scala:93)
   at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
 
 
 So that's ok, I can catch the exception, and then shut everything down 
 gracefully, in this case.  However, when I do this, it seems there is a 
 daemon thread still around, which doesn't quit, and so the server never 
 actually exits the jvm.  Specifically, this thread seems to hang around:
 kafka-logcleaner-0 prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on 
 condition [112c07000]
java.lang.Thread.State: TIMED_WAITING (parking)
   at sun.misc.Unsafe.park(Native Method)
   - parking to wait for  7f40d4be8 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
   at 
 java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
   at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
   at 
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
   at 
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
   at 
 java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
   at java.lang.Thread.run(Thread.java:680)
 Looking at the code in kafka.log.LogManager(), it does seem like it starts up 
 the scheduler to clean logs, before then trying to connect to zk (and in this 
 case fail):
   /* Schedule the cleanup task to delete old logs */
   if(scheduler != null) {
 info(starting log cleaner every  + logCleanupIntervalMs +  ms)
 scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
   }
 So this scheduler does not appear to be stopped if startup fails.  However, 
 if I catch the above RuntimeException, and then call KafkaServer.shutdown(), 
 then it will stop the scheduler, and all is good.
 However, it seems odd that if I get an exception when calling 
 KafkaServer.startup(), that I should still have to do a 
 KafkaServer.shutdown().  Rather, wouldn't it be better to have it internally 
 cleanup after itself if startup() gets an exception?  I'm not sure I can 
 reliably call shutdown() after a failed startup()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-589) Clean shutdown after startup connection failure

2014-09-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-589:

Assignee: Ewen Cheslack-Postava  (was: Swapnil Ghike)

 Clean shutdown after startup connection failure
 ---

 Key: KAFKA-589
 URL: https://issues.apache.org/jira/browse/KAFKA-589
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7.2, 0.8.0
Reporter: Jason Rosenberg
Assignee: Ewen Cheslack-Postava
Priority: Minor
  Labels: bugs, newbie
 Attachments: KAFKA-589-v1.patch


 Hi,
 I'm embedding the kafka server (0.7.2) in an application container.   I've 
 noticed that if I try to start the server without zookeeper being available, 
 by default it gets a zk connection timeout after 6 seconds, and then throws 
 an Exception out of KafkaServer.startup()E.g., I see this stack trace:
 Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: 
 Unable to connect to zookeeper server within timeout: 6000
   at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
   at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98)
   at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84)
   at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44)
   at kafka.log.LogManager.init(LogManager.scala:93)
   at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
 
 
 So that's ok, I can catch the exception, and then shut everything down 
 gracefully, in this case.  However, when I do this, it seems there is a 
 daemon thread still around, which doesn't quit, and so the server never 
 actually exits the jvm.  Specifically, this thread seems to hang around:
 kafka-logcleaner-0 prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on 
 condition [112c07000]
java.lang.Thread.State: TIMED_WAITING (parking)
   at sun.misc.Unsafe.park(Native Method)
   - parking to wait for  7f40d4be8 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
   at 
 java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
   at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
   at 
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
   at 
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
   at 
 java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
   at java.lang.Thread.run(Thread.java:680)
 Looking at the code in kafka.log.LogManager(), it does seem like it starts up 
 the scheduler to clean logs, before then trying to connect to zk (and in this 
 case fail):
   /* Schedule the cleanup task to delete old logs */
   if(scheduler != null) {
 info(starting log cleaner every  + logCleanupIntervalMs +  ms)
 scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
   }
 So this scheduler does not appear to be stopped if startup fails.  However, 
 if I catch the above RuntimeException, and then call KafkaServer.shutdown(), 
 then it will stop the scheduler, and all is good.
 However, it seems odd that if I get an exception when calling 
 KafkaServer.startup(), that I should still have to do a 
 KafkaServer.shutdown().  Rather, wouldn't it be better to have it internally 
 cleanup after itself if startup() gets an exception?  I'm not sure I can 
 reliably call shutdown() after a failed startup()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-589) Clean shutdown after startup connection failure

2014-09-25 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148726#comment-14148726
 ] 

Neha Narkhede commented on KAFKA-589:
-

Thanks for fixing a longstanding bug! +1 on the patch.

 Clean shutdown after startup connection failure
 ---

 Key: KAFKA-589
 URL: https://issues.apache.org/jira/browse/KAFKA-589
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.7.2, 0.8.0
Reporter: Jason Rosenberg
Assignee: Swapnil Ghike
Priority: Minor
  Labels: bugs, newbie
 Attachments: KAFKA-589-v1.patch


 Hi,
 I'm embedding the kafka server (0.7.2) in an application container.   I've 
 noticed that if I try to start the server without zookeeper being available, 
 by default it gets a zk connection timeout after 6 seconds, and then throws 
 an Exception out of KafkaServer.startup()E.g., I see this stack trace:
 Exception in thread main org.I0Itec.zkclient.exception.ZkTimeoutException: 
 Unable to connect to zookeeper server within timeout: 6000
   at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
   at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:98)
   at org.I0Itec.zkclient.ZkClient.init(ZkClient.java:84)
   at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44)
   at kafka.log.LogManager.init(LogManager.scala:93)
   at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
 
 
 So that's ok, I can catch the exception, and then shut everything down 
 gracefully, in this case.  However, when I do this, it seems there is a 
 daemon thread still around, which doesn't quit, and so the server never 
 actually exits the jvm.  Specifically, this thread seems to hang around:
 kafka-logcleaner-0 prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on 
 condition [112c07000]
java.lang.Thread.State: TIMED_WAITING (parking)
   at sun.misc.Unsafe.park(Native Method)
   - parking to wait for  7f40d4be8 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
   at 
 java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
   at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
   at 
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
   at 
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
   at 
 java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
   at java.lang.Thread.run(Thread.java:680)
 Looking at the code in kafka.log.LogManager(), it does seem like it starts up 
 the scheduler to clean logs, before then trying to connect to zk (and in this 
 case fail):
   /* Schedule the cleanup task to delete old logs */
   if(scheduler != null) {
 info(starting log cleaner every  + logCleanupIntervalMs +  ms)
 scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
   }
 So this scheduler does not appear to be stopped if startup fails.  However, 
 if I catch the above RuntimeException, and then call KafkaServer.shutdown(), 
 then it will stop the scheduler, and all is good.
 However, it seems odd that if I get an exception when calling 
 KafkaServer.startup(), that I should still have to do a 
 KafkaServer.shutdown().  Rather, wouldn't it be better to have it internally 
 cleanup after itself if startup() gets an exception?  I'm not sure I can 
 reliably call shutdown() after a failed startup()



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1586) support sticky partitioning in the new producer

2014-09-25 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148735#comment-14148735
 ] 

Neha Narkhede commented on KAFKA-1586:
--

+1 on not supporting sticky partitioning strategy in the new producer. It 
already offers flexibility to the user to use any partitioning strategy. If 
there are no objections, I'm leaning towards closing this JIRA.

 support sticky partitioning in the new producer
 ---

 Key: KAFKA-1586
 URL: https://issues.apache.org/jira/browse/KAFKA-1586
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jun Rao
 Attachments: KAFKA-1586.patch


 If a message doesn't specify a key or a partition, the new producer selects a 
 partition for each message in a round-robin way. As a result, in a window of 
 linger.ms, messages are spread around in all partitions of a topic. Compared 
 with another strategy that assigns all messages to a single partition in the 
 same time window, this strategy may not compress the message set as well 
 since the batch is smaller. Another potential problem with this strategy is 
 that the compression ratio could be sensitive to the change of # partitions 
 in a topic. If # partitions are increased in a topic, the produced data may 
 not be compressed as well as before. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1591) Clean-up Unnecessary stack trace in error/warn logs

2014-09-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1591:
-
Reviewer: Jun Rao

[~junrao], please feel free to reassign :) Going through JIRAs that have 
patches available but no reviewers. 

 Clean-up Unnecessary stack trace in error/warn logs
 ---

 Key: KAFKA-1591
 URL: https://issues.apache.org/jira/browse/KAFKA-1591
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Abhishek Sharma
  Labels: newbie
 Fix For: 0.9.0

 Attachments: Jira-1591- As per Guozhang latest comment.patch, 
 Jira-1591- Complete Changes Patch.patch, 
 Jira-1591-SocketConnection-Warning.patch, 
 Jira1591-SendProducerRequest-Warning.patch


 Some of the unnecessary stack traces in error / warning log entries can 
 easily pollute the log files. Examples include KAFKA-1066, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability

2014-09-25 Thread Jun Rao


 On Sept. 26, 2014, 12:17 a.m., Jun Rao wrote:
  Thanks for the patch. Looks good overall. A couple of comments.
  
  1. Let's add a check of min.isr in Partition.appendMessagesToLeader() so 
  that the message is not added to the leader's log if isr is less than 
  min.isr.
  2. We talked about dropping the ack  1 support in the jira. However, I 
  think that's easier done after kafka-1583 is completed. So, we don't have 
  to do this change here. I will file a follow up jira.

1. We can give a different exception in this case, sth like 
RejectMessageDueToNotEnoughReplicas.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25886/#review54635
---


On Sept. 25, 2014, 7:41 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/25886/
 ---
 
 (Updated Sept. 25, 2014, 7:41 p.m.)
 
 
 Review request for kafka.
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1555: provide strong consistency with reasonable availability
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 
   core/src/main/scala/kafka/cluster/Partition.scala ff106b4 
   core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 
   core/src/main/scala/kafka/common/NotEnoughReplicasException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 
   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
 39f777b 
   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c 
 
 Diff: https://reviews.apache.org/r/25886/diff/
 
 
 Testing
 ---
 
 With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, 
 with 1,3 and 4 min.insync.replicas.
 * min.insync.replicas=1 behaved normally (all writes succeeded as long as a 
 broker was up)
 * min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and 
 one broker was down
 * min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1
 
 See notes about retry behavior in the JIRA.
 
 
 Thanks,
 
 Gwen Shapira
 




[jira] [Updated] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker

2014-09-25 Thread BalajiSeshadri (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BalajiSeshadri updated KAFKA-1618:
--
Attachment: KAFKA-1618-CLEAN.patch

 Exception thrown when running console producer with no port number for the 
 broker
 -

 Key: KAFKA-1618
 URL: https://issues.apache.org/jira/browse/KAFKA-1618
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1.1
Reporter: Gwen Shapira
Assignee: BalajiSeshadri
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, 
 KAFKA-1618-CLEAN.patch, KAFKA-1618-REBASED.patch, 
 KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, 
 KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch


 When running console producer with just localhost as the broker list, I get 
 ArrayIndexOutOfBounds exception.
 I expect either a clearer error about arguments or for the producer to 
 guess a default port.
 [root@shapira-1 bin]# ./kafka-console-producer.sh  --topic rufus1 
 --broker-list localhost
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102)
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97)
   at 
 kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32)
   at 
 kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41)
   at kafka.producer.Producer.init(Producer.scala:59)
   at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158)
   at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker

2014-09-25 Thread BalajiSeshadri (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148755#comment-14148755
 ] 

BalajiSeshadri commented on KAFKA-1618:
---

Ok,this one applies cleanly without issues.

Here is the log when i applied.
MER2AL-E0043615:kafka-trunk-1 balaji.seshadri$ git apply  KAFKA-1618.patch -v
Checking patch core/src/main/scala/kafka/tools/ConsoleProducer.scala...
Checking patch core/src/main/scala/kafka/tools/GetOffsetShell.scala...
Checking patch core/src/main/scala/kafka/tools/ProducerPerformance.scala...
Checking patch core/src/main/scala/kafka/tools/ReplayLogProducer.scala...
Checking patch core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala...
Checking patch core/src/main/scala/kafka/tools/SimpleConsumerShell.scala...
Checking patch core/src/main/scala/kafka/utils/ToolsUtils.scala...
Applied patch core/src/main/scala/kafka/tools/ConsoleProducer.scala cleanly.
Applied patch core/src/main/scala/kafka/tools/GetOffsetShell.scala cleanly.
Applied patch core/src/main/scala/kafka/tools/ProducerPerformance.scala cleanly.
Applied patch core/src/main/scala/kafka/tools/ReplayLogProducer.scala cleanly.
Applied patch core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
cleanly.
Applied patch core/src/main/scala/kafka/tools/SimpleConsumerShell.scala cleanly.
Applied patch core/src/main/scala/kafka/utils/ToolsUtils.scala cleanly.

[~nehanarkhede] Please verify.



 Exception thrown when running console producer with no port number for the 
 broker
 -

 Key: KAFKA-1618
 URL: https://issues.apache.org/jira/browse/KAFKA-1618
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1.1
Reporter: Gwen Shapira
Assignee: BalajiSeshadri
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, 
 KAFKA-1618-CLEAN.patch, KAFKA-1618-REBASED.patch, 
 KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, 
 KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch


 When running console producer with just localhost as the broker list, I get 
 ArrayIndexOutOfBounds exception.
 I expect either a clearer error about arguments or for the producer to 
 guess a default port.
 [root@shapira-1 bin]# ./kafka-console-producer.sh  --topic rufus1 
 --broker-list localhost
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102)
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97)
   at 
 kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32)
   at 
 kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41)
   at kafka.producer.Producer.init(Producer.scala:59)
   at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158)
   at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker

2014-09-25 Thread BalajiSeshadri (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BalajiSeshadri updated KAFKA-1618:
--
Status: In Progress  (was: Patch Available)

 Exception thrown when running console producer with no port number for the 
 broker
 -

 Key: KAFKA-1618
 URL: https://issues.apache.org/jira/browse/KAFKA-1618
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1.1
Reporter: Gwen Shapira
Assignee: BalajiSeshadri
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, 
 KAFKA-1618-CLEAN.patch, KAFKA-1618-REBASED.patch, 
 KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, 
 KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch


 When running console producer with just localhost as the broker list, I get 
 ArrayIndexOutOfBounds exception.
 I expect either a clearer error about arguments or for the producer to 
 guess a default port.
 [root@shapira-1 bin]# ./kafka-console-producer.sh  --topic rufus1 
 --broker-list localhost
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102)
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97)
   at 
 kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32)
   at 
 kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41)
   at kafka.producer.Producer.init(Producer.scala:59)
   at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158)
   at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1652) NoSuchElementException on server shutdown

2014-09-25 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148770#comment-14148770
 ] 

Sriharsha Chintalapani commented on KAFKA-1652:
---

[~jjkoshy] [~nehanarkhede] I am able to reproduce this will follow-up with a 
patch.

 NoSuchElementException on server shutdown
 -

 Key: KAFKA-1652
 URL: https://issues.apache.org/jira/browse/KAFKA-1652
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
  Labels: newbie++

 Observed the following exception on server shutdown. Happens pretty 
 frequently though not on every shutdown. Ways to reproduce -
 1. Checkout kafka from trunk
 2. Run bin/kafka-server-start.sh config/server.properties
 3. Ctrl C
 {noformat}
 2014-09-25 12:31:14,441] ERROR None.get (kafka.network.Processor)
 java.util.NoSuchElementException: None.get
   at scala.None$.get(Option.scala:313)
   at scala.None$.get(Option.scala:311)
   at kafka.network.ConnectionQuotas.dec(SocketServer.scala:522)
   at kafka.network.AbstractServerThread.close(SocketServer.scala:165)
   at kafka.network.AbstractServerThread.close(SocketServer.scala:157)
   at kafka.network.Processor.close(SocketServer.scala:372)
   at kafka.network.AbstractServerThread.closeAll(SocketServer.scala:178)
   at 
 kafka.network.Processor$$anonfun$run$3.apply$mcV$sp(SocketServer.scala:362)
   at kafka.utils.Utils$.swallow(Utils.scala:172)
   at kafka.utils.Logging$class.swallowError(Logging.scala:106)
   at 
 kafka.network.AbstractServerThread.swallowError(SocketServer.scala:108)
   at kafka.network.Processor.run(SocketServer.scala:362)
   at java.lang.Thread.run(Thread.java:695)
 {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker

2014-09-25 Thread BalajiSeshadri (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BalajiSeshadri updated KAFKA-1618:
--
Status: Patch Available  (was: In Progress)

 Exception thrown when running console producer with no port number for the 
 broker
 -

 Key: KAFKA-1618
 URL: https://issues.apache.org/jira/browse/KAFKA-1618
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1.1
Reporter: Gwen Shapira
Assignee: BalajiSeshadri
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-ALL.patch, 
 KAFKA-1618-CLEAN.patch, KAFKA-1618-REBASED.patch, 
 KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, 
 KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch


 When running console producer with just localhost as the broker list, I get 
 ArrayIndexOutOfBounds exception.
 I expect either a clearer error about arguments or for the producer to 
 guess a default port.
 [root@shapira-1 bin]# ./kafka-console-producer.sh  --topic rufus1 
 --broker-list localhost
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102)
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97)
   at 
 kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32)
   at 
 kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41)
   at kafka.producer.Producer.init(Producer.scala:59)
   at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158)
   at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1586) support sticky partitioning in the new producer

2014-09-25 Thread Jim Hoagland (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148771#comment-14148771
 ] 

Jim Hoagland commented on KAFKA-1586:
-

+1 agreeing with Jay and Neha.  I found the sticky behavior confusing and made 
me wonder what the heck was happening (I thought I was doing something wrong).

 support sticky partitioning in the new producer
 ---

 Key: KAFKA-1586
 URL: https://issues.apache.org/jira/browse/KAFKA-1586
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jun Rao
 Attachments: KAFKA-1586.patch


 If a message doesn't specify a key or a partition, the new producer selects a 
 partition for each message in a round-robin way. As a result, in a window of 
 linger.ms, messages are spread around in all partitions of a topic. Compared 
 with another strategy that assigns all messages to a single partition in the 
 same time window, this strategy may not compress the message set as well 
 since the batch is smaller. Another potential problem with this strategy is 
 that the compression ratio could be sensitive to the change of # partitions 
 in a topic. If # partitions are increased in a topic, the produced data may 
 not be compressed as well as before. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: Kafka-trunk #278

2014-09-25 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/278/changes

Changes:

[neha.narkhede] KAFKA-589 Clean shutdown after startup connection failure; 
reviewed by Neha Narkhede

--
Started by an SCM change
Building remotely on ubuntu-1 (Ubuntu ubuntu) in workspace 
https://builds.apache.org/job/Kafka-trunk/ws/
  git rev-parse --is-inside-work-tree
Fetching changes from the remote Git repository
  git config remote.origin.url 
  https://git-wip-us.apache.org/repos/asf/kafka.git
Fetching upstream changes from https://git-wip-us.apache.org/repos/asf/kafka.git
  git --version
  git fetch --tags --progress 
  https://git-wip-us.apache.org/repos/asf/kafka.git 
  +refs/heads/*:refs/remotes/origin/*
  git rev-parse origin/trunk^{commit}
Checking out Revision 9c17747baab829adb268da28b4d943bbd6ef4e9f (origin/trunk)
  git config core.sparsecheckout
  git checkout -f 9c17747baab829adb268da28b4d943bbd6ef4e9f
  git rev-list f750dba65f9d9552a61a0754c46fa6e294785b31
[Kafka-trunk] $ /bin/bash -xe /tmp/hudson2924963140271174866.sh
+ gradle
/tmp/hudson2924963140271174866.sh: line 2: gradle: command not found
Build step 'Execute shell' marked build as failure


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-25 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148795#comment-14148795
 ] 

Joel Koshy commented on KAFKA-1555:
---

This is an implementation detail but wrt the duplicates scenario. i.e., 4(a)  - 
on  a not-enough-replicas error, the producer implementation could just stop 
retrying the producer request and pounding the broker(s) until the ISR returns 
to the min.isr level. It can instead switch to refreshing the topic metadata 
with backoff and checking the current ISR cardinality which is included in the 
metadata response (although it is currently inaccurate and needs to be fixed - 
KAFKA-1367). As soon as it returns to min.isr it can retry the request.


 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, KAFKA-1555.5.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1647) Replication offset checkpoints (high water marks) can be lost on hard kills and restarts

2014-09-25 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148802#comment-14148802
 ] 

Joel Koshy commented on KAFKA-1647:
---

Yes that is one possibility but not the only one. For example, suppose this is 
a topic with replication factor three. This is typically when bringing up a 
cluster that was previously hard-killed. Suppose b1 and b2 are brought up 
simultaneously and lose their HW as described above. Suppose the controller 
then elects b1 as the leader. b2 then becomes the follower (successfully) but 
as part of that transition will truncate to zero. I think there should be more 
scenarios since I also saw this with a topic with replication factor of two but 
have not checked the logs yet to see if it was due to a subsequent bounce or 
something else.

 Replication offset checkpoints (high water marks) can be lost on hard kills 
 and restarts
 

 Key: KAFKA-1647
 URL: https://issues.apache.org/jira/browse/KAFKA-1647
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Priority: Critical
  Labels: newbie++

 We ran into this scenario recently in a production environment. This can 
 happen when enough brokers in a cluster are taken down. i.e., a rolling 
 bounce done properly should not cause this issue. It can occur if all 
 replicas for any partition are taken down.
 Here is a sample scenario:
 * Cluster of three brokers: b0, b1, b2
 * Two partitions (of some topic) with replication factor two: p0, p1
 * Initial state:
 ** p0: leader = b0, ISR = {b0, b1}
 ** p1: leader = b1, ISR = {b0, b1}
 * Do a parallel hard-kill of all brokers
 * Bring up b2, so it is the new controller
 * b2 initializes its controller context and populates its leader/ISR cache 
 (i.e., controllerContext.partitionLeadershipInfo) from zookeeper. The last 
 known leaders are b0 (for p0) and b1 (for p2)
 * Bring up b1
 * The controller's onBrokerStartup procedure initiates a replica state change 
 for all replicas on b1 to become online. As part of this replica state change 
 it gets the last known leader and ISR and sends a LeaderAndIsrRequest to b1 
 (for p1 and p2). This LeaderAndIsr request contains: {{p0: leader=b0; p1: 
 leader=b1;} leaders=b1}. b0 is indicated as the leader of p0 but it is not 
 included in the leaders field because b0 is down.
 * On receiving the LeaderAndIsrRequest, b1's replica manager will 
 successfully make b2 the leader for p1 (and create the local replica object 
 corresponding to p1). It will however abort the become follower transition 
 for p0 because the designated leader b2 is offline. So it will not create the 
 local replica object for p0.
 * It will then start the high water mark checkpoint thread. Since only p1 has 
 a local replica object, only p1's high water mark will be checkpointed to 
 disk. p0's previously written checkpoint  if any will be lost.
 So in summary it seems we should always create the local replica object even 
 if the online transition does not happen.
 Possible symptoms of the above bug could be one or more of the following (we 
 saw 2 and 3):
 # Data loss; yes on a hard-kill data loss is expected, but this can actually 
 cause loss of nearly all data if the broker becomes follower, truncates, and 
 soon after happens to become leader.
 # High IO on brokers that lose their high water mark then subsequently (on a 
 successful become follower transition) truncate their log to zero and start 
 catching up from the beginning.
 # If the offsets topic is affected, then offsets can get reset. This is 
 because during an offset load we don't read past the high water mark. So if a 
 water mark is missing then we don't load anything (even if the offsets are 
 there in the log).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)