Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability

2014-09-22 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25886/
---

(Updated Sept. 22, 2014, 5:59 a.m.)


Review request for kafka.


Repository: kafka


Description
---

KAFKA-1555: provide strong consistency with reasonable availability


Diffs (updated)
-

  core/src/main/scala/kafka/cluster/Partition.scala ff106b4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 
  core/src/main/scala/kafka/common/NotEnoughReplicasException.scala 
PRE-CREATION 
  core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 

Diff: https://reviews.apache.org/r/25886/diff/


Testing
---

With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, 
with 1,3 and 4 min.insync.replicas.
* min.insync.replicas=1 behaved normally (all writes succeeded as long as a 
broker was up)
* min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and 
one broker was down
* min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1

See notes about retry behavior in the JIRA.


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1555:

Attachment: KAFKA-1555.1.patch

Sorry, first patch was missing a file.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1555:

Status: Patch Available  (was: Open)

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Sriram Subramanian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142967#comment-14142967
 ] 

Sriram Subramanian commented on KAFKA-1555:
---

Sorry to be late here but I think this an important change and we need to 
ensure this is perfectly the right behavior for the long term.

To summarize the discussion and code change so far - 

1. We would set the min.isr per topic in log config
2. We would use this config only when ack is set to -1 and fail the call if the 
number of in sync replicas is less than min isr

The main drawbacks I see with this approach are - 

1. If we plan to set this value at a per topic level, this should be part of 
create/modify topic and should be set during topic creation or modified later. 
This ensures that if we do expose a createTopic api in the protocol, it would 
be available to be set/modified.
2. I could see scenarios where multiple writers could have different 
requirements on the same topic and may not have any knowledge of how the topic 
was created. 
3. I think what we are really solving for is to either make the write durable 
on all replicas or on just the in sync replicas. The min.isr value provides the 
option of a number and I think any value other than 0 or no_of_replicas is of 
no value. This would only confuse the clients when they create the topic.

This is how I interpret the acks w.r.t the clients - 

0 - No response required. I don't really care if the write happened
1 - I need a response after the write happened to the leader successfully
 1 - I need the write to happen on all replicas before a response. This has 
 two options - 
 a. Response is sent after write happens to replicas in ISR
 b. Response is sent after write happens to all replicas 

Having an enum for ack as below is a lot clearer and sets the expectations 
right in my opinion. 

enum AckType {
 No_Response,
 Write_To_Leader,
 Write_To_ISR, (Chooses availability over consistency)
 Write_To_All_Replicas (Chooses consistency over availability)
}




 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-977) Implement generation/term per leader to reconcile messages correctly

2014-09-22 Thread Sriram Subramanian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142990#comment-14142990
 ] 

Sriram Subramanian commented on KAFKA-977:
--

I would like to bring this issue to discussion again. Kafka is used a lot more 
now for use cases other than just moving data from point A to point B. For 
example, consider the case where Kafka acts as the log and materialized views 
are created by consuming these logs. In such scenarios, it is important that 
the logs are consistent and do not diverge even under unclean leader elections 
(Replaying these replicas should create the same view). Having a 
generation/term is essential for log replication and it would be great for 
Kafka to have the same guarantees as other log replication protocols.  I would 
be happy to give more detailed examples for this but would want to know if we 
think this is an issue to address soon.

 Implement generation/term per leader to reconcile messages correctly
 

 Key: KAFKA-977
 URL: https://issues.apache.org/jira/browse/KAFKA-977
 Project: Kafka
  Issue Type: Bug
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian

 During unclean leader election, the log messages can diverge and when the 
 followers come back up Kafka does not reconcile correctly. To implement it 
 correctly, we need to add a term/generation to each message and use that to 
 reconcile.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1034) Improve partition reassignment to optimize writes to zookeeper

2014-09-22 Thread Sriram Subramanian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142999#comment-14142999
 ] 

Sriram Subramanian commented on KAFKA-1034:
---

At the high level, we were making to many writes to ZK during partition 
reassignment. Lot of things have changed since then and we would need to 
revisit the code to see if this is still an issue. It would be useful if 
someone who has touched this code recently can comment on it.

 Improve partition reassignment to optimize writes to zookeeper
 --

 Key: KAFKA-1034
 URL: https://issues.apache.org/jira/browse/KAFKA-1034
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0, 0.8.1
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
  Labels: newbie++
 Fix For: 0.8.2


 For ReassignPartition tool, check if optimizing the writes to ZK after every 
 replica reassignment is possible



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1644) Inherit FetchResponse from RequestOrResponse

2014-09-22 Thread Anton Karamanov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Karamanov reassigned KAFKA-1644:
--

Assignee: Anton Karamanov

 Inherit FetchResponse from RequestOrResponse
 

 Key: KAFKA-1644
 URL: https://issues.apache.org/jira/browse/KAFKA-1644
 Project: Kafka
  Issue Type: Bug
Reporter: Anton Karamanov
Assignee: Anton Karamanov

 Unlike all other Kafka API responses {{FetchResponse}} is not a subclass of 
 RequestOrResponse, which requires handling it as a special case while 
 processing responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1644) Inherit FetchResponse from RequestOrResponse

2014-09-22 Thread Anton Karamanov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Karamanov updated KAFKA-1644:
---
Attachment: 0001-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch

 Inherit FetchResponse from RequestOrResponse
 

 Key: KAFKA-1644
 URL: https://issues.apache.org/jira/browse/KAFKA-1644
 Project: Kafka
  Issue Type: Bug
Reporter: Anton Karamanov
Assignee: Anton Karamanov
 Attachments: 
 0001-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch


 Unlike all other Kafka API responses {{FetchResponse}} is not a subclass of 
 RequestOrResponse, which requires handling it as a special case while 
 processing responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1644) Inherit FetchResponse from RequestOrResponse

2014-09-22 Thread Anton Karamanov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143112#comment-14143112
 ] 

Anton Karamanov commented on KAFKA-1644:


Here's a 
[patch|^0001-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch].

 Inherit FetchResponse from RequestOrResponse
 

 Key: KAFKA-1644
 URL: https://issues.apache.org/jira/browse/KAFKA-1644
 Project: Kafka
  Issue Type: Bug
Reporter: Anton Karamanov
Assignee: Anton Karamanov
 Attachments: 
 0001-KAFKA-1644-Inherit-FetchResponse-from-RequestOrRespo.patch


 Unlike all other Kafka API responses {{FetchResponse}} is not a subclass of 
 RequestOrResponse, which requires handling it as a special case while 
 processing responses.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1367) Broker topic metadata not kept in sync with ZooKeeper

2014-09-22 Thread Magnus Edenhill (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143218#comment-14143218
 ] 

Magnus Edenhill commented on KAFKA-1367:


Apart from supplying the ISR count and list in its metadata API, librdkafka 
also provides an `enforce.isr.cnt` configuration property that fails
produce requests locally before transmission if the currently known ISR count 
is smaller than the configured value.
This is a workaround for the broker not fully honoring `request.required.acks`, 
i.e., if `request.required.acks=3` and only one broker is available the produce 
request will not fail.
More info in the original issue here: 
https://github.com/edenhill/librdkafka/issues/91

Generally I would assume that information provided by the broker is correct, 
otherwise it should not be included at all since it can't be used (reliably).

 Broker topic metadata not kept in sync with ZooKeeper
 -

 Key: KAFKA-1367
 URL: https://issues.apache.org/jira/browse/KAFKA-1367
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0, 0.8.1
Reporter: Ryan Berdeen
  Labels: newbie++
 Attachments: KAFKA-1367.txt


 When a broker is restarted, the topic metadata responses from the brokers 
 will be incorrect (different from ZooKeeper) until a preferred replica leader 
 election.
 In the metadata, it looks like leaders are correctly removed from the ISR 
 when a broker disappears, but followers are not. Then, when a broker 
 reappears, the ISR is never updated.
 I used a variation of the Vagrant setup created by Joe Stein to reproduce 
 this with latest from the 0.8.1 branch: 
 https://github.com/also/kafka/commit/dba36a503a5e22ea039df0f9852560b4fb1e067c



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-1034) Improve partition reassignment to optimize writes to zookeeper

2014-09-22 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede resolved KAFKA-1034.
--
Resolution: Won't Fix

 Improve partition reassignment to optimize writes to zookeeper
 --

 Key: KAFKA-1034
 URL: https://issues.apache.org/jira/browse/KAFKA-1034
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0, 0.8.1
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
  Labels: newbie++
 Fix For: 0.8.2


 For ReassignPartition tool, check if optimizing the writes to ZK after every 
 replica reassignment is possible



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1034) Improve partition reassignment to optimize writes to zookeeper

2014-09-22 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143326#comment-14143326
 ] 

Neha Narkhede commented on KAFKA-1034:
--

We need to move to batch ZK writes which is the new API available in ZK 3.4.x. 
That should improve the # of writes to zookeeper for partition reassignment as 
well. So we can probably just close this JIRA.

 Improve partition reassignment to optimize writes to zookeeper
 --

 Key: KAFKA-1034
 URL: https://issues.apache.org/jira/browse/KAFKA-1034
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0, 0.8.1
Reporter: Sriram Subramanian
Assignee: Sriram Subramanian
  Labels: newbie++
 Fix For: 0.8.2


 For ReassignPartition tool, check if optimizing the writes to ZK after every 
 replica reassignment is possible



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143352#comment-14143352
 ] 

Gwen Shapira commented on KAFKA-1555:
-

Hi Sriram,

Thank you for raising these concerns.

Here are some points regarding the drawbacks:
1. This is exactly how it works right now. If you'll build Kafka with the patch 
I uploaded, you'll be able to use build/kafka-topics.sh to create/alter topics 
with min.insync.replicas parameter specified in --config flag.

2. Absolutely. [~junrao] explained how to can work (simply ignore the 
NotEnoughReplicas exception). The only issue we currently have is the retries, 
which can also be resolved by the client.

3. I disagree that this is what we are trying to solve. We are trying to give 
admins more control over what durable writes mean for specific topics. For my 
use-case, I'd like to have majority-write. This can be done for a 3-replica 
topic by setting min.insync.replicas to 2. If I wanted all replicas, I can 
set min.insync.replicas=3, and if I want just ISR, I can set 
min.insync.replicas=1.

As you can see, the current solution is very flexible and supports multiple 
durability requirements. It satisfies both your use-case and mine. I agree that 
this requires a bit more understanding of what you are trying to achieve, but I 
think I can document it in a way thats fairly easy to understand (with some 
common examples, as I explained above).



 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1590) Binarize trace level request logging along with debug level text logging

2014-09-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143360#comment-14143360
 ] 

Guozhang Wang commented on KAFKA-1590:
--

[~jkreps] agreed, binary log files should just be a option. What I proposed is 
that instead of introducing a new config for that we can just set it indirectly 
at the logging level. I.e. with trace level on the request logs, text log files 
will be used; with debug level on request logs (which will only be logging 
request summaries), the detailed log will also be written but as binary files.

[~abhi21]] Currently we use daily / hourly rolling, which I think is good 
enough. Or do you have any ideas that needs size-based rolling?

 Binarize trace level request logging along with debug level text logging
 

 Key: KAFKA-1590
 URL: https://issues.apache.org/jira/browse/KAFKA-1590
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Abhishek Sharma
  Labels: newbie
 Fix For: 0.9.0


 With trace level logging, the request handling logs can grow very fast 
 depending on the client behavior (e.g. consumer with 0 maxWait and hence keep 
 sending fetch requests). Previously we have changed it to debug level which 
 only provides a summary of the requests, omitting request details. However 
 this does not work perfectly since summaries are not sufficient for 
 trouble-shooting, and turning on trace level upon issues will be too late.
 The proposed solution here, is to default to debug level logging with trace 
 level logging printed as binary format at the same time. The generated binary 
 files can then be further compressed / rolled out. When needed, we will then 
 decompress / parse the trace logs into texts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled

2014-09-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143383#comment-14143383
 ] 

Guozhang Wang commented on KAFKA-1305:
--

Thanks [~becket_qin] for the great findings. It seems to me that as long as the 
controller's channel manager is async, no matter how large is its queue the 
corner-case issue can still happen in (i.e. request blocked in the queue for 
brokers that is already shutdown but the ZK watcher not fired yet), and causing 
some chain of lock conflicts.

Currently the controller has multiple threads for admin commands, ZK listeners, 
scheduled operations (leader electioner), etc, which complicates the locking 
mechanism inside controller. After going through the code I think it would be 
better to refactor the controller as following:

1. Besides the async channel manager's sender thread, we use only a single 
controller thread and have a single working queue for the controller thread.
3. ZK fire handling logic determines the event (topic/partition/broker change, 
admin operation, etc), and put the task into the queue.
4. Scheduled task is also created periodically and put into the queue.
5. The controller did one task at a time, which do not need to compete locks on 
controller metadata.
6. Make the channel manager's queue size infinite and add a metric on 
monitoring its size.

With this the controller logic would be easier to read / debug, may also help 
KAFKA-1558. The downside is that since a single thread is used, it loses 
parallelism for controller task handling, and the unbounded channel queue may 
also be an issue (when there is a bug). But since controller tasks are usually 
rare in practice, this should not be an issue.

 Controller can hang on controlled shutdown with auto leader balance enabled
 ---

 Key: KAFKA-1305
 URL: https://issues.apache.org/jira/browse/KAFKA-1305
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Neha Narkhede
Priority: Blocker
 Fix For: 0.9.0


 This is relatively easy to reproduce especially when doing a rolling bounce.
 What happened here is as follows:
 1. The previous controller was bounced and broker 265 became the new 
 controller.
 2. I went on to do a controlled shutdown of broker 265 (the new controller).
 3. In the mean time the automatically scheduled preferred replica leader 
 election process started doing its thing and starts sending 
 LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
 (t@113 below).
 4. While that's happening, the controlled shutdown process on 265 succeeds 
 and proceeds to deregister itself from ZooKeeper and shuts down the socket 
 server.
 5. (ReplicaStateMachine actually removes deregistered brokers from the 
 controller channel manager's list of brokers to send requests to.  However, 
 that removal cannot take place (t@18 below) because preferred replica leader 
 election task owns the controller lock.)
 6. So the request thread to broker 265 gets into infinite retries.
 7. The entire broker shutdown process is blocked on controller shutdown for 
 the same reason (it needs to acquire the controller lock).
 Relevant portions from the thread-dump:
 Controller-265-to-broker-265-send-thread - Thread t@113
java.lang.Thread.State: TIMED_WAITING
   at java.lang.Thread.sleep(Native Method)
   at 
 kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
   at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
   - locked java.lang.Object@6dbf14a7
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Locked ownable synchronizers:
   - None
 ...
 Thread-4 - Thread t@17
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
 kafka-scheduler-0
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
   at 
 

[jira] [Commented] (KAFKA-1367) Broker topic metadata not kept in sync with ZooKeeper

2014-09-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143413#comment-14143413
 ] 

Guozhang Wang commented on KAFKA-1367:
--

Hello [~edenhill], I think your use case may be supported in a new feature that 
is currently developed: KAFKA-1555. Would you like to take a look at it and see 
if your case is really covered?

 Broker topic metadata not kept in sync with ZooKeeper
 -

 Key: KAFKA-1367
 URL: https://issues.apache.org/jira/browse/KAFKA-1367
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0, 0.8.1
Reporter: Ryan Berdeen
  Labels: newbie++
 Attachments: KAFKA-1367.txt


 When a broker is restarted, the topic metadata responses from the brokers 
 will be incorrect (different from ZooKeeper) until a preferred replica leader 
 election.
 In the metadata, it looks like leaders are correctly removed from the ISR 
 when a broker disappears, but followers are not. Then, when a broker 
 reappears, the ISR is never updated.
 I used a variation of the Vagrant setup created by Joe Stein to reproduce 
 this with latest from the 0.8.1 branch: 
 https://github.com/also/kafka/commit/dba36a503a5e22ea039df0f9852560b4fb1e067c



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143486#comment-14143486
 ] 

Joe Stein commented on KAFKA-1555:
--

[~gwenshap] thanks for the patch! few initial comments/can you please:

1) update review board for others to assist in reviewing/feedback a la 
https://cwiki.apache.org/confluence/display/KAFKA/Patch+submission+and+review 

2) we need to add an extra test in Partition.checkEnoughReplicasReachOffset() 
please (also maybe in ProducerFailureHandlingTest too would make sense please)

3) (nit pick) in one case you use InSync and others Insync could you pick one 
(preferable InSync since that is more consistent with existing code) just for 
consistency.

4) do we want some validation if someone tries to set 0 or -1 or something for 
the min.isr value?

5) * note * once this is committed we should also update 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

6) we should also update the public enums in the java producer too 
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java

7) We should have a warning message I think would make sense in KafkaApis 
appendToLocalLog on server side, no?

I should be able to give this a spin in testing later tonight/tomorrow 
(exciting!!!) on a cluster with topics and such.

I like this patch/approach too because people could apply it to 0.8.1.1 easily 
if they wanted (not saying (yet) it should be in 0.8.1.2) and it handles all 
the use cases folks have held stake on this (so far) without major changes or 
confusion in how it works.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2014-09-22 Thread Otis Gospodnetic (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143500#comment-14143500
 ] 

Otis Gospodnetic commented on KAFKA-1481:
-

This is [~junrao]'s new suggestion for dealing with the problem this issue is 
meant to address: http://search-hadoop.com/m/4TaT4aHH1S

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Priority: Critical
  Labels: patch
 Fix For: 0.8.2

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch


 MBeans should not use dashes or underscores as separators because these 
 characters are allowed in hostnames, topics, group and consumer IDs, etc., 
 and these are embedded in MBeans names making it impossible to parse out 
 individual bits from MBeans.
 Perhaps a pipe character should be used to avoid the conflict. 
 This looks like a major blocker because it means nobody can write Kafka 0.8.x 
 monitoring tools unless they are doing it for themselves AND do not use 
 dashes AND do not use underscores.
 See: http://search-hadoop.com/m/4TaT4lonIW



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143501#comment-14143501
 ] 

Gwen Shapira commented on KAFKA-1555:
-

There is a review board (https://reviews.apache.org/r/25886/) - see link above.

I agree with everything else you mentioned. I'll upload an updated patch later 
today. 

Any suggestions regarding the problem with retries? 
With |ISR|min.ISR the message is written to topic, *and* we throw an error. 
With default retries, the message will be written 3 times. Other than 
documenting this behavior, I don't have a good solution. 

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143521#comment-14143521
 ] 

Joe Stein edited comment on KAFKA-1555 at 9/22/14 5:51 PM:
---

ah, my bad. 

 Any suggestions regarding the problem with retries? 

I think that is an issue beyond this ticket that happens in other cases (e.g. 
MaxMessageSize and retry that 3 times won't change a thing like this same 
problem) that we don't have a solution for yet that classifies exceptions... 
So I think we should do some fix for it but that is not related to this ticket 
IMHO... unlike MaxMessageSize though it is possible after the first failure 
another replica comes online and succeeds so that functionality might be 
desirable ( I could see how it would be).


was (Author: joestein):
ah, my bad. 

 Any suggestions regarding the problem with retries? 

I think that is an issue beyond this ticket that happens in other cases (e.g. 
MaxMessageSize and retry that 3 times won't change a thing like this same 
problem) that we don't have a solution for yet that classifies exceptions... 
So I think we should do some fix for it but that is not related to this ticket 
IMHO.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143521#comment-14143521
 ] 

Joe Stein commented on KAFKA-1555:
--

ah, my bad. 

 Any suggestions regarding the problem with retries? 

I think that is an issue beyond this ticket that happens in other cases (e.g. 
MaxMessageSize and retry that 3 times won't change a thing like this same 
problem) that we don't have a solution for yet that classifies exceptions... 
So I think we should do some fix for it but that is not related to this ticket 
IMHO.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-847) kafka appender layout does not work for kafka 0.7.1

2014-09-22 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-847:

Fix Version/s: 0.8.2

 kafka appender layout does not work for kafka 0.7.1
 ---

 Key: KAFKA-847
 URL: https://issues.apache.org/jira/browse/KAFKA-847
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.7.1, 0.8.0
 Environment: Win7 64 bit
Reporter: Sining Ma
Assignee: Peter Pham
  Labels: easyfix, newbie
 Fix For: 0.8.2

 Attachments: KAFKA-847-v1.patch, KAFKA-847-v2.patch


 I am using kafka 0.7.1 right now.
 I am using the following log4j properties file and trying to send some log 
 information to kafka server.
 log4j.rootLogger=INFO,file,stdout
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %t %m (%c)%n
 log4j.appender.file=org.apache.log4j.RollingFileAppender
 #log4j.appender.file.FileNamePattern=c:\\development\\producer-agent_%d{-MM-dd}.log
 log4j.appender.file.File=${AC_DATA_HOME}\\lmservice\\tailer-aggregator.log
 log4j.appender.file.MaxFileSize=100MB
 log4j.appender.file.MaxBackupIndex=1
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
 #log4j.appender.file.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
 log4j.appender.file.layout.ConversionPattern=[%d] %p %t %m (%c)%n
 log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender
 log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
 log4j.appender.KAFKA.layout.ConversionPattern=[%d] %p %t %m (%c)%n
 log4j.appender.KAFKA.BrokerList=0:localhost:9092
 log4j.appender.KAFKA.SerializerClass=kafka.serializer.StringEncoder
 log4j.appender.KAFKA.Topic=test.topic
 # Turn on all our debugging info
 log4j.logger.kafka=INFO, KAFKA
 log4j.logger.org=INFO, KAFKA
 log4j.logger.com=INFO, KAFKA
 However, I find that the messages send to kafka server are not formatted as 
 my defined layout. 
 KafkaLog4jAppender just sends messages(%m), and there is no other conversion 
 patterns.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Sriram Subramanian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143578#comment-14143578
 ] 

Sriram Subramanian commented on KAFKA-1555:
---

[~gwenshap] A key thing I would like to ensure when a feature is added is if it 
can be easily explained to the end users of the system. A system can provide a 
great level of flexibility by exposing its functionalities as configs but what 
gets hard with these data systems is that over time there is config bloat and 
it gets complex to specify the guarantees that the system provides. 

Let us say we had N replicas. 
min isr  = 0, 1 is trivial to explain
min isr = N - use it when you need strong durability
min isr = 2 ... N-1  - I need your help here. What would be a good guidance to 
give the users on what values to use between 2 to N-1? 

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143622#comment-14143622
 ] 

Joe Stein commented on KAFKA-1555:
--

I agree with [~gwenshap] 

A business case for this is being able to tolerate rack/server/zone related 
data unrecoverable failures in case of ISR shrinking to 1.

If you have 2 ... N-1 min isr then you guarantee that at times of broker 
failures that the ISR equals your minimum hardware tolerance for acknowledging 
a successful write and being able to recover that write.

For MANY data types this is equal to 2 (or 3) just so you have the data on 
another rack, dc, server, zone, whatever.  

As it exists now, with the ACK=-1 and ISR = 1 you could get into the condition 
where you are successfully writing/acking to 1 broker (assuming replication 3 
and 2 nodes have failed) and then if that last broker standing dies before 
another comes up and replicates what it was writing for that window of time 
where the ISR was 1 you will have lost the data that is deemed saved.  In 
that failure case everything written during window of time where the ISR was 
1 will never be recoverable even though the producer thought so and had acked 
it.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Sriram Subramanian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143628#comment-14143628
 ] 

Sriram Subramanian commented on KAFKA-1555:
---

Surviving the leader crash makes sense. That is provided by any value from 2 to 
N. I am more interested in being able to specify why 2 and not N? Performance? 
Availability? Probability of data loss? If so, we should be able to quantify 
it. I don't want to drag this discussion but I think it is a common mistake to 
not quantify what are the benefits of choosing  one value over the other 
between 2...N-1 and pushing that choice to the users by providing a config that 
is fine grained. It would be great to document this use case with an example 
and indicating how performance, availability, data loss are affected by 
choosing one value over the other.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143701#comment-14143701
 ] 

Joe Stein commented on KAFKA-1555:
--

  I am more interested in being able to specify why 2 and not N

The value of the min.isr is a balance between your tolerance for data loss for 
stored data and need of availability of brokers to write to. Your tolerance of 
availability (X nodes being unavailable but still able to ACK writes) is the 
difference between your replication factor (Y) and the number of servers you 
must save data to (Z) when using ACK=-1 and min.isr = Z.  X=Y-Z. If your 
tolerance for node failures is 3 and you require to always have 2 servers 
running to write to (otherwise failure to write occurs to clients) then you 
have to set your replication factor to 5 and use ACK=-1 with a min.isr = 3.  If 
you require more servers to write to for sucecsfull writes but the same amount 
of node failures then you must also up your replication factor.

... something like that...

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143701#comment-14143701
 ] 

Joe Stein edited comment on KAFKA-1555 at 9/22/14 8:03 PM:
---

  I am more interested in being able to specify why 2 and not N

The value of the min.isr is a balance between your tolerance for the 
probability of data loss for stored data and need of availability of brokers to 
write to. Your tolerance of availability (X nodes being unavailable but still 
able to ACK writes) is the difference between your replication factor (Y) and 
the number of servers you must save data to (Z) when using ACK=-1 and min.isr = 
Z.  X=Y-Z. If your tolerance for node failures is 3 and you require to always 
have 2 servers running to write to (otherwise failure to write occurs to 
clients) then you have to set your replication factor to 5 and use ACK=-1 with 
a min.isr = 3.  If you require more servers to write to for successful writes 
but the same amount of node failures then you must also up your replication 
factor.

... something like that...


was (Author: joestein):
  I am more interested in being able to specify why 2 and not N

The value of the min.isr is a balance between your tolerance for data loss for 
stored data and need of availability of brokers to write to. Your tolerance of 
availability (X nodes being unavailable but still able to ACK writes) is the 
difference between your replication factor (Y) and the number of servers you 
must save data to (Z) when using ACK=-1 and min.isr = Z.  X=Y-Z. If your 
tolerance for node failures is 3 and you require to always have 2 servers 
running to write to (otherwise failure to write occurs to clients) then you 
have to set your replication factor to 5 and use ACK=-1 with a min.isr = 3.  If 
you require more servers to write to for successful writes but the same amount 
of node failures then you must also up your replication factor.

... something like that...

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 22131: Patch for KAFKA-1477

2014-09-22 Thread Rajasekar Elango

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22131/#review54191
---



config/server.properties
https://reviews.apache.org/r/22131/#comment94187

Can we set secure property to ***false*** to make kafka run in non-secure 
mode by default so it won't impact existing users


- Rajasekar Elango


On Sept. 18, 2014, 12:43 p.m., Ivan Lyutov wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/22131/
 ---
 
 (Updated Sept. 18, 2014, 12:43 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1477
 https://issues.apache.org/jira/browse/KAFKA-1477
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Minor fixes, cleanup
 
 Refactoring
 
 Fixed tests compilation error.
 
 Updated according to requested changes: refactoring, minor edits.
 
 
 Added basic functionality for new producer.
 
 
 bug fixes after rebase
 
 
 bug fix after rebase
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 f58b8508d3f813a51015abed772c704390887d7e 
   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
 f9de4af426449cceca12a8de9a9f54a6241d28d8 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/SSLSocketChannel.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/UnknownKeyStoreException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 4dd2cdf773f7eb01a93d7f994383088960303dfc 
   
 clients/src/main/java/org/apache/kafka/common/network/security/AuthConfig.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/security/KeyStores.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/security/SecureAuth.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/security/StoreInitializer.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/network/security/store/JKSInitializer.java
  PRE-CREATION 
   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
 5c5e3d40819e41cab7b52a0eeaee5f2e7317b7b3 
   config/client.keystore PRE-CREATION 
   config/client.public-key PRE-CREATION 
   config/client.security.properties PRE-CREATION 
   config/consumer.properties 83847de30d10b6e78bb8de28e0bb925d7c0e6ca2 
   config/producer.properties 39d65d7c6c21f4fccd7af89be6ca12a088d5dd98 
   config/server.keystore PRE-CREATION 
   config/server.properties 5c0905a572b1f0d8b07bfca967a09cb856a6b09f 
   config/server.public-key PRE-CREATION 
   config/server.security.properties PRE-CREATION 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 51cdccf7f90eb530cc62b094ed822b8469d50b12 
   core/src/main/scala/kafka/client/ClientUtils.scala 
 ce7ede3f6d60e756e252257bd8c6fedc21f21e1c 
   core/src/main/scala/kafka/cluster/Broker.scala 
 9407ed21fbbd57edeecd888edc32bea6a05d95b3 
   core/src/main/scala/kafka/common/UnknownKeyStoreException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/consumer/ConsumerConfig.scala 
 9ebbee6c16dc83767297c729d2d74ebbd063a993 
   core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
 b9e2bea7b442a19bcebd1b350d39541a8c9dd068 
   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
 8db9203d164a4a54f94d8d289e070a0f61e03ff9 
   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
 ecbfa0f328ba6a652a758ab20cacef324a8b2fb8 
   core/src/main/scala/kafka/network/BlockingChannel.scala 
 eb7bb14d94cb3648c06d4de36a3b34aacbde4556 
   core/src/main/scala/kafka/network/SocketServer.scala 
 d67899080c21e0b6db84657d6845c7ef23b59b0e 
   core/src/main/scala/kafka/network/security/AuthConfig.scala PRE-CREATION 
   core/src/main/scala/kafka/network/security/KeyStores.scala PRE-CREATION 
   core/src/main/scala/kafka/network/security/SSLSocketChannel.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/network/security/SecureAuth.scala PRE-CREATION 
   core/src/main/scala/kafka/network/security/store/JKSInitializer.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/producer/ProducerConfig.scala 
 3cdf23dce3407f1770b9c6543e3a8ae8ab3ff255 
   core/src/main/scala/kafka/producer/ProducerPool.scala 
 43df70bb461dd3e385e6b20396adef3c4016a3fc 
   core/src/main/scala/kafka/producer/SyncProducer.scala 
 489f0077512d9a69be81649c490274964290fa40 
   core/src/main/scala/kafka/producer/SyncProducerConfig.scala 
 69b2d0c11bb1412ce76d566f285333c806be301a 
   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
 2e9532e820b5b5c63dfd55f5454b32866d084a37 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 dce48db175d6ea379f848a7768de0b1c8e4b929f 
   core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
 

[jira] [Commented] (KAFKA-1477) add authentication layer and initial JKS x509 implementation for brokers, producers and consumer for network communication

2014-09-22 Thread Rajasekar Elango (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14143826#comment-14143826
 ] 

Rajasekar Elango commented on KAFKA-1477:
-

[~lagivan] Thanks for picking this up. The review looks good, I just left one 
comment to set default value of secure flag to false. Also do you have this 
code available on a git repo which I can try to build and test..?

 add authentication layer and initial JKS x509 implementation for brokers, 
 producers and consumer for network communication
 --

 Key: KAFKA-1477
 URL: https://issues.apache.org/jira/browse/KAFKA-1477
 Project: Kafka
  Issue Type: New Feature
Reporter: Joe Stein
Assignee: Ivan Lyutov
 Fix For: 0.9.0

 Attachments: KAFKA-1477-binary.patch, KAFKA-1477.patch, 
 KAFKA-1477_2014-06-02_16:59:40.patch, KAFKA-1477_2014-06-02_17:24:26.patch, 
 KAFKA-1477_2014-06-03_13:46:17.patch, KAFKA-1477_trunk.patch






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Handling errors in the new (0.8.2) Java Client's Producer

2014-09-22 Thread Andrew Stein
Using the new (0.8.2) KafkaProducer interface, if the Kafka server
goes down, how can one stop future attempts to write messages that
have been sent to the producer?

This is the TLDR of the post. The rest is detail...


Background
==
We have a process in a large application that writes a lot of data to
Kafka. However, this data is not mission critical. When a problem
arises writing to Kafka, we want the application to continue working,
but to stop sending data to Kafka, allowing other tasks to
continue. The issue I have is handling messages that have been sent
to the producer but are waiting to go to Kafka. These messages remain
long after my processing is over, timing out, writing to the logs, and
preventing me from moving forward. I am looking for some way to tell
the client to stop forwarding messages to Kafka.


The following code is the only thing I have been able to come up
with (based on the c892c08 git SHA). I have several issues with this
code:
(1) Too much knowledge of the KafkaProducer internals:
(1a) Getting and using the private ioThread member.
(1b) Know the kafka-producer-network-thread name.
(2) Using the deprecated Thread.stop(..) method.
(3) A general feeling of unease that this won't work when I most want
it to.

What I have so far
==

   //
   // Closing the producer
   //
try {
lastFuture.get(60, TimeUnit.SECONDS); // Wait a minute
} catch (TimeoutException e) {
// Do not close() a producer that cannot be flushed in time
(by get(timeout) on the last future.
// We are afraid that close() will block for a long time if
kafka-producer-network-thread is alive but cannot write.
// Instead try to kill the thread.
killProducerIOThread();
return;
} catch (InterruptedException | ExecutionException e) {
logger.error(Kafka exception thrown in future.get(timeout)
(continuing), e);
}
try {
producer.close();
} catch (KafkaException e) {
logger.error(Kafka exception thrown in producer.close()
(continuing), e);
}


//
// Callback
//
private class ErrorCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception
exception) {
if (exception == null) { // Not an error.
return;
}

// Prevent producer.send() and producer.close().
// Note: We may already be in producer.close().
disableEvenMoreMessagesBeingSent()

// Here it is useful to kill the
kafka-producer-network-threads.
// This prevent the treads continuing in the background and
spouting log messages for a long-long time.
// The difficulty is that some of these callbacks are on our
threads and some on kafka-producer-network-thread threads.
// If this is a kafka-producer-network-thread, it will just
commit thread suicide.
// If we are in our thread we get hold of the
private((KafkaProducer) producer).ioThread and kill that. Ugly!!

String threadName = Thread.currentThread().getName();
if (threadName.equals(kafka-producer-network-thread)) {
Thread.currentThread().interrupt();
throw new ThreadDeath(); // Commit thread suicide

} else { // Presumably a qtask-worker-nn thread
killProducerIOThread();
}
}
}

private void killProducerIOThread() {

try {
Thread ioThread = (Thread) FieldUtils.readField(producer,
ioThread, true);
ioThread.interrupt();
ioThread.stop(new ThreadDeath());
} catch (IllegalAccessException e) {
}
}



I posted a similar question to us...@kafka.apache.org. Only later have
I realized that this is more suitable to dev@kafka.apache.org. I
apologize for the double posting.

Andrew Stein


Offset manager movement (due to change in KAFKA-1469)

2014-09-22 Thread Joel Koshy
I just wanted to send this out as an FYI but it does not affect any
released versions.

This only affects those who release off trunk and use Kafka-based
consumer offset management.  KAFKA-1469 fixes an issue in our
Utils.abs code. Since we use this method in determining the offset
manager for a consumer group, the fix can yield a different offset
manager if you happen to run off trunk and upgrade across the fix.
This won't affect all groups, but those that happen to hash to a value
that is affected by the bug fixed in KAFKA-1469.

(Sort of related - we may want to consider not using hashcode on the
group and switch to a more standard hashing algorithm but I highly
doubt that hashcode values on a string will change in the future.)

Thanks,

-- 
Joel


[jira] [Updated] (KAFKA-1469) Util.abs function does not return correct absolute values for negative values

2014-09-22 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1469:
--
Description: 
Reported by Russell Melick. [edit1: I don't think this affects correctness of
the places that use the abs utility since we just need it to return a
consistent positive value, but we should fix this nonetheless]
[edit 2: actually it affects correctness in places that depend on consistent
values across the fix. e.g., the offset manager is determined based on
abs(hash(consumer group)). So after an upgrade that can change]

{code}
 /**
  * Get the absolute value of the given number. If the number is
   Int.MinValue return 0.
  * This is different from java.lang.Math.abs or scala.math.abs in that
   they return Int.MinValue (!).
  */
 def abs(n: Int) = n  0x7fff
{code}

For negative integers, it does not return the absolute value.  It does
appear to do what the comment says for Int.MinValue though.  For example,

{code}
   scala -1  0x7fff
   res8: Int = 2147483647

   scala -2  0x7fff
   res9: Int = 2147483646

   scala -2147483647  0x7fff
   res11: Int = 1

   scala -2147483648  0x7fff
   res12: Int = 0
{code}


  was:
Reported by Russell Melick. [edit: I don't think this affects correctness of
the places that use the abs utility since we just need it to return a
consistent positive value, but we should fix this nonetheless]

{code}
 /**
  * Get the absolute value of the given number. If the number is
   Int.MinValue return 0.
  * This is different from java.lang.Math.abs or scala.math.abs in that
   they return Int.MinValue (!).
  */
 def abs(n: Int) = n  0x7fff
{code}

For negative integers, it does not return the absolute value.  It does
appear to do what the comment says for Int.MinValue though.  For example,

{code}
   scala -1  0x7fff
   res8: Int = 2147483647

   scala -2  0x7fff
   res9: Int = 2147483646

   scala -2147483647  0x7fff
   res11: Int = 1

   scala -2147483648  0x7fff
   res12: Int = 0
{code}



 Util.abs function does not return correct absolute values for negative values
 -

 Key: KAFKA-1469
 URL: https://issues.apache.org/jira/browse/KAFKA-1469
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
  Labels: newbie, patch
 Fix For: 0.8.2

 Attachments: KAFKA-1469.patch


 Reported by Russell Melick. [edit1: I don't think this affects correctness of
 the places that use the abs utility since we just need it to return a
 consistent positive value, but we should fix this nonetheless]
 [edit 2: actually it affects correctness in places that depend on consistent
 values across the fix. e.g., the offset manager is determined based on
 abs(hash(consumer group)). So after an upgrade that can change]
 {code}
  /**
   * Get the absolute value of the given number. If the number is
Int.MinValue return 0.
   * This is different from java.lang.Math.abs or scala.math.abs in that
they return Int.MinValue (!).
   */
  def abs(n: Int) = n  0x7fff
 {code}
 For negative integers, it does not return the absolute value.  It does
 appear to do what the comment says for Int.MinValue though.  For example,
 {code}
scala -1  0x7fff
res8: Int = 2147483647
scala -2  0x7fff
res9: Int = 2147483646
scala -2147483647  0x7fff
res11: Int = 1
scala -2147483648  0x7fff
res12: Int = 0
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Apache Kafka Meetup = Strata Hadoop World = NYC

2014-09-22 Thread Joe Stein
Hey folks, we are going to be kicking off our first ever Apache Kafka NYC
meetup during Hadoop world (10/15/2014)
http://www.meetup.com/Apache-Kafka-NYC/events/206001372/ starting @ 6pm.

We already have 3 (maybe 4) speakers ready to go (I will be updating the
site over the next day or so).  I was also thinking of maybe having
lightening talks (5 minutes a piece) so if you are interested in doing a
quick presentation (5 minutes a piece) to talk about how you use Kafka or
whatever (related to Kafka) let me know please (message me privately).

Hope if you are in the area you can attend and even want to talk for a few
minutes about how you are using Kafka too =8^)

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/


Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability

2014-09-22 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25886/
---

(Updated Sept. 23, 2014, 12:05 a.m.)


Review request for kafka.


Changes
---

Fixes following comments in JIRA


Repository: kafka


Description
---

KAFKA-1555: provide strong consistency with reasonable availability


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 
  core/src/main/scala/kafka/common/NotEnoughReplicasException.scala 
PRE-CREATION 
  core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 
  core/src/main/scala/kafka/server/KafkaApis.scala c584b55 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
39f777b 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c 

Diff: https://reviews.apache.org/r/25886/diff/


Testing
---

With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, 
with 1,3 and 4 min.insync.replicas.
* min.insync.replicas=1 behaved normally (all writes succeeded as long as a 
broker was up)
* min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and 
one broker was down
* min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1

See notes about retry behavior in the JIRA.


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1555:

Attachment: KAFKA-1555.2.patch

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14144101#comment-14144101
 ] 

Gwen Shapira commented on KAFKA-1555:
-

Uploaded an updated patch addressing [~joestein] comments.

Few clarifications:
2)  Partition.checkEnoughReplicasReachOffset()  does not have its own test 
cases (actually no tests for Partition at all). So added tests in 
ProducerFailureHandlingTest and SyncProducerTest.

7) We should have a warning message I think would make sense in KafkaApis 
appendToLocalLog on server side, no?
I added the warning message, but I think we should take it out. The exception 
doesn't get thrown at the appendToLocalLog, it gets thrown at the 
producerRequestPurgatory.checkAndMaybeWatch, which doesn't do its own logging. 
So basically, nothing gets logged.


 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1367) Broker topic metadata not kept in sync with ZooKeeper

2014-09-22 Thread Dan Hoffman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14144108#comment-14144108
 ] 

Dan Hoffman commented on KAFKA-1367:


I'd also add that having the broker being able to serve up (accurate) metadata 
allows client applications to build custom dashboards, etc.  As I understand 
it, there is the idea to move away from zookeeper (or at least for somethings) 
within the kafka infrastructure - so having the broker be able to provide this 
would be good.

 Broker topic metadata not kept in sync with ZooKeeper
 -

 Key: KAFKA-1367
 URL: https://issues.apache.org/jira/browse/KAFKA-1367
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0, 0.8.1
Reporter: Ryan Berdeen
  Labels: newbie++
 Attachments: KAFKA-1367.txt


 When a broker is restarted, the topic metadata responses from the brokers 
 will be incorrect (different from ZooKeeper) until a preferred replica leader 
 election.
 In the metadata, it looks like leaders are correctly removed from the ISR 
 when a broker disappears, but followers are not. Then, when a broker 
 reappears, the ISR is never updated.
 I used a variation of the Vagrant setup created by Joe Stein to reproduce 
 this with latest from the 0.8.1 branch: 
 https://github.com/also/kafka/commit/dba36a503a5e22ea039df0f9852560b4fb1e067c



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 25886: KAFKA-1555: provide strong consistency with reasonable availability

2014-09-22 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25886/
---

(Updated Sept. 23, 2014, 12:28 a.m.)


Review request for kafka.


Repository: kafka


Description
---

KAFKA-1555: provide strong consistency with reasonable availability


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java d434f42 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae791 
  core/src/main/scala/kafka/common/NotEnoughReplicasException.scala 
PRE-CREATION 
  core/src/main/scala/kafka/log/LogConfig.scala 5746ad4 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
39f777b 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 24deea0 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 2dbdd3c 

Diff: https://reviews.apache.org/r/25886/diff/


Testing
---

With 3 broker cluster, created 3 topics each with 1 partition and 3 replicas, 
with 1,3 and 4 min.insync.replicas.
* min.insync.replicas=1 behaved normally (all writes succeeded as long as a 
broker was up)
* min.insync.replicas=3 returned NotEnoughReplicas when required.acks=-1 and 
one broker was down
* min.insync.replicas=4 returned NotEnoughReplicas when required.acks=-1

See notes about retry behavior in the JIRA.


Thanks,

Gwen Shapira



[jira] [Updated] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1555:

Attachment: KAFKA-1555.3.patch

Removed the warning from appendToLocalLog since the exception won't get thrown 
there anyway.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka Security

2014-09-22 Thread Joe Stein
At the request of the folks that were at the first meeting and can't attend
tomorrow I am moving tomorrow's meeting to next Tuesday (same bat time,
same bat channel).

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/

On Tue, Sep 16, 2014 at 4:59 PM, Joe Stein joe.st...@stealth.ly wrote:

 yup, yup, yup | done, done, done

 On Tue, Sep 16, 2014 at 1:54 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Joe,

 Can you add me, Jun, and Neha too.

 -Jay

 On Tue, Sep 16, 2014 at 10:37 AM, Joe Stein joe.st...@stealth.ly wrote:
  Hi Andrew, yes the meeting took place and we plan to-do it every two
 weeks
  (same bat time, same bat channel) moving forward.
 
  In attendance was Michael Herstine (LinkedIn), Arvind Mani (LinkedIn),
 Gwen
  Shapira (Cloudera) and myself.
 
  Gwen updated the wiki after our discussion.  Basically we are thinking
 of
  using 3 ports one for plain text (so like it is now), one for SASL
  (implementing username/password and kerberos at least) and one for SSL
 and
  they will all be configurable on/off.  Some investigation is going on
 now
  to see about how we can do this without making any wire protocol changes
  (ideal) or minimal changes at least.
 
  Let me know and I can add you to the invite if you would like to
 contribute
  the more help and input the better for sure.
 
  Also in regards to KAFKA-1477 I just asked Ivan to update the patch to
  latest trunk and we could (demand required) make a patch that works with
  0.8.1.X too for folks to use... This doesn't work yet with the new
 producer
  (TBD) or any other clients so be aware it is not yet baked in and from
  release project perspective I don't know what in that patch will survive
  (hopefully all of it).
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /
 
  On Tue, Sep 16, 2014 at 10:17 AM, Andrew Psaltis 
 psaltis.and...@gmail.com
  wrote:
 
  Hi,
  I was just reading the recent changes to:
  https://cwiki.apache.org/confluence/display/KAFKA/Security after
 getting
  off a call about Kafka security and how we are jumping through hoops --
  like having PGP keys on the consumers and producers to get around the
 lack
  of SSL support. Did the meeting that Joe proposed happen for Sept 9th
  happen? If not is there a plan to have it? I was also looking at:
  https://issues.apache.org/jira/browse/KAFKA-1477 and it seems like
 there
  have been no comments since 11/08/2014. I would be interested in
 helping
  with the TLS/SSL support as we have a need for it now.
 
  Thanks,
  Andrew
 





[jira] [Updated] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker

2014-09-22 Thread BalajiSeshadri (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BalajiSeshadri updated KAFKA-1618:
--
Status: Patch Available  (was: In Progress)

 Exception thrown when running console producer with no port number for the 
 broker
 -

 Key: KAFKA-1618
 URL: https://issues.apache.org/jira/browse/KAFKA-1618
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1.1
Reporter: Gwen Shapira
Assignee: BalajiSeshadri
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1618-ALL.patch, KAFKA-1618-REVIEW-COMMENTS.patch, 
 KAFKA-1618.patch


 When running console producer with just localhost as the broker list, I get 
 ArrayIndexOutOfBounds exception.
 I expect either a clearer error about arguments or for the producer to 
 guess a default port.
 [root@shapira-1 bin]# ./kafka-console-producer.sh  --topic rufus1 
 --broker-list localhost
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102)
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97)
   at 
 kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32)
   at 
 kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41)
   at kafka.producer.Producer.init(Producer.scala:59)
   at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158)
   at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1618) Exception thrown when running console producer with no port number for the broker

2014-09-22 Thread BalajiSeshadri (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BalajiSeshadri updated KAFKA-1618:
--
Attachment: KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch

[~nehanarkhede] Corrected space issue with IntelliJ,please validate and check 
in.

 Exception thrown when running console producer with no port number for the 
 broker
 -

 Key: KAFKA-1618
 URL: https://issues.apache.org/jira/browse/KAFKA-1618
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1.1
Reporter: Gwen Shapira
Assignee: BalajiSeshadri
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1618-ALL.patch, 
 KAFKA-1618-REVIEW-COMMENTS-SPACE-CORRECTION.patch, 
 KAFKA-1618-REVIEW-COMMENTS.patch, KAFKA-1618.patch


 When running console producer with just localhost as the broker list, I get 
 ArrayIndexOutOfBounds exception.
 I expect either a clearer error about arguments or for the producer to 
 guess a default port.
 [root@shapira-1 bin]# ./kafka-console-producer.sh  --topic rufus1 
 --broker-list localhost
 java.lang.ArrayIndexOutOfBoundsException: 1
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:102)
   at 
 kafka.client.ClientUtils$$anonfun$parseBrokerList$1.apply(ClientUtils.scala:97)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at kafka.client.ClientUtils$.parseBrokerList(ClientUtils.scala:97)
   at 
 kafka.producer.BrokerPartitionInfo.init(BrokerPartitionInfo.scala:32)
   at 
 kafka.producer.async.DefaultEventHandler.init(DefaultEventHandler.scala:41)
   at kafka.producer.Producer.init(Producer.scala:59)
   at kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:158)
   at kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1476) Get a list of consumer groups

2014-09-22 Thread BalajiSeshadri (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BalajiSeshadri reassigned KAFKA-1476:
-

Assignee: BalajiSeshadri  (was: Balaji Seshadri)

 Get a list of consumer groups
 -

 Key: KAFKA-1476
 URL: https://issues.apache.org/jira/browse/KAFKA-1476
 Project: Kafka
  Issue Type: Wish
  Components: tools
Affects Versions: 0.8.1.1
Reporter: Ryan Williams
Assignee: BalajiSeshadri
  Labels: newbie
 Fix For: 0.9.0

 Attachments: KAFKA-1476-RENAME.patch, KAFKA-1476.patch


 It would be useful to have a way to get a list of consumer groups currently 
 active via some tool/script that ships with kafka. This would be helpful so 
 that the system tools can be explored more easily.
 For example, when running the ConsumerOffsetChecker, it requires a group 
 option
 bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group 
 ?
 But, when just getting started with kafka, using the console producer and 
 consumer, it is not clear what value to use for the group option.  If a list 
 of consumer groups could be listed, then it would be clear what value to use.
 Background:
 http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-328) Write unit test for kafka server startup and shutdown API

2014-09-22 Thread BalajiSeshadri (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BalajiSeshadri reassigned KAFKA-328:


Assignee: BalajiSeshadri  (was: Balaji Seshadri)

 Write unit test for kafka server startup and shutdown API 
 --

 Key: KAFKA-328
 URL: https://issues.apache.org/jira/browse/KAFKA-328
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: BalajiSeshadri
  Labels: newbie

 Background discussion in KAFKA-320
 People often try to embed KafkaServer in an application that ends up calling 
 startup() and shutdown() repeatedly and sometimes in odd ways. To ensure this 
 works correctly we have to be very careful about cleaning up resources. This 
 is a good practice for making unit tests reliable anyway.
 A good first step would be to add some unit tests on startup and shutdown to 
 cover various cases:
 1. A Kafka server can startup if it is not already starting up, if it is not 
 currently being shutdown, or if it hasn't been already started
 2. A Kafka server can shutdown if it is not already shutting down, if it is 
 not currently starting up, or if it hasn't been already shutdown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-328) Write unit test for kafka server startup and shutdown API

2014-09-22 Thread BalajiSeshadri (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BalajiSeshadri resolved KAFKA-328.
--
Resolution: Fixed

 Write unit test for kafka server startup and shutdown API 
 --

 Key: KAFKA-328
 URL: https://issues.apache.org/jira/browse/KAFKA-328
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: BalajiSeshadri
  Labels: newbie

 Background discussion in KAFKA-320
 People often try to embed KafkaServer in an application that ends up calling 
 startup() and shutdown() repeatedly and sometimes in odd ways. To ensure this 
 works correctly we have to be very careful about cleaning up resources. This 
 is a good practice for making unit tests reliable anyway.
 A good first step would be to add some unit tests on startup and shutdown to 
 cover various cases:
 1. A Kafka server can startup if it is not already starting up, if it is not 
 currently being shutdown, or if it hasn't been already started
 2. A Kafka server can shutdown if it is not already shutting down, if it is 
 not currently starting up, or if it hasn't been already shutdown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-328) Write unit test for kafka server startup and shutdown API

2014-09-22 Thread BalajiSeshadri (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

BalajiSeshadri updated KAFKA-328:
-
Attachment: KAFKA-328.patch

[~nehanarkhede] please find patch attached.

 Write unit test for kafka server startup and shutdown API 
 --

 Key: KAFKA-328
 URL: https://issues.apache.org/jira/browse/KAFKA-328
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: BalajiSeshadri
  Labels: newbie
 Attachments: KAFKA-328.patch


 Background discussion in KAFKA-320
 People often try to embed KafkaServer in an application that ends up calling 
 startup() and shutdown() repeatedly and sometimes in odd ways. To ensure this 
 works correctly we have to be very careful about cleaning up resources. This 
 is a good practice for making unit tests reliable anyway.
 A good first step would be to add some unit tests on startup and shutdown to 
 cover various cases:
 1. A Kafka server can startup if it is not already starting up, if it is not 
 currently being shutdown, or if it hasn't been already started
 2. A Kafka server can shutdown if it is not already shutting down, if it is 
 not currently starting up, or if it hasn't been already shutdown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1490) remove gradlew initial setup output from source distribution

2014-09-22 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14144311#comment-14144311
 ] 

Joe Stein commented on KAFKA-1490:
--

[~edgefox] can you rebase please, unless something else causing failing to 
apply otherwise looks good I can test and commit tomorrow (so for such long 
delay was traveling last week)

 remove gradlew initial setup output from source distribution
 

 Key: KAFKA-1490
 URL: https://issues.apache.org/jira/browse/KAFKA-1490
 Project: Kafka
  Issue Type: Bug
Reporter: Joe Stein
Assignee: Ivan Lyutov
Priority: Blocker
 Fix For: 0.8.2

 Attachments: KAFKA-1490-2.patch, KAFKA-1490.patch


 Our current source releases contains lots of stuff in the gradle folder we do 
 not need



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1645) some more jars in our src release

2014-09-22 Thread Joe Stein (JIRA)
Joe Stein created KAFKA-1645:


 Summary: some more jars in our src release
 Key: KAFKA-1645
 URL: https://issues.apache.org/jira/browse/KAFKA-1645
 Project: Kafka
  Issue Type: Bug
Reporter: Joe Stein
Priority: Blocker
 Fix For: 0.8.2


The first one is being taken care of in KAFKA-1490 

the rest... can we just delete them? Do we need/want them anymore? 

{code}

root@precise64:~/kafka-0.8.1.1-src# find ./ -name *jar
./gradle/wrapper/gradle-wrapper.jar
./lib/apache-rat-0.8.jar
./system_test/migration_tool_testsuite/0.7/lib/kafka-0.7.0.jar
./system_test/migration_tool_testsuite/0.7/lib/kafka-perf-0.7.0.jar
./system_test/migration_tool_testsuite/0.7/lib/zkclient-0.1.jar
./contrib/hadoop-consumer/lib/piggybank.jar
./contrib/hadoop-producer/lib/piggybank.jar

{code}

rat is not required in the project I can speak for that file +1 to remove it

I don't see why we have to keep the other ones nor what code changes we have to 
make for getting rid of them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1646) Improve consumer read performance for Windows

2014-09-22 Thread xueqiang wang (JIRA)
xueqiang wang created KAFKA-1646:


 Summary: Improve consumer read performance for Windows
 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
Assignee: Jay Kreps


This patch is for Window platform only. In Windows platform, if there are more 
than one replicas writing to disk, the segment log files will not be consistent 
in disk and then consumer reading performance will be dropped down greatly. 
This fix allocates more disk spaces when rolling a new segment, and then it 
will improve the consumer reading performance in NTFS file system.
This patch doesn't affect file allocation of other filesystems, for it only 
adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-09-22 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14144378#comment-14144378
 ] 

Joe Stein commented on KAFKA-1555:
--

[~gwenshap] patch applied, tests passed however; I ran with 3 brokers all new 
code and didn't get the expected results

{code}

Topic:testDefault   PartitionCount:4ReplicationFactor:3 Configs:
Topic: testDefault  Partition: 0Leader: 1   Replicas: 1,2,3 
Isr: 1
Topic: testDefault  Partition: 1Leader: 1   Replicas: 2,3,1 
Isr: 1
Topic: testDefault  Partition: 2Leader: 1   Replicas: 3,1,2 
Isr: 1
Topic: testDefault  Partition: 3Leader: 1   Replicas: 1,3,2 
Isr: 1
Topic:testNew   PartitionCount:4ReplicationFactor:3 
Configs:min.insync.replicas=2
Topic: testNew  Partition: 0Leader: 1   Replicas: 2,1,3 Isr: 1
Topic: testNew  Partition: 1Leader: 1   Replicas: 3,2,1 Isr: 1
Topic: testNew  Partition: 2Leader: 1   Replicas: 1,3,2 Isr: 1
Topic: testNew  Partition: 3Leader: 1   Replicas: 2,3,1 Isr: 1

{code}

I am still able to produce to topic testNew (though I shouldn't be able too 
since 2 brokers are down and only 1 is up with min.isr=2)

I got proper exceptions trying to create invalid values for the config

{code}

root@precise64:/opt/apache/kafka# bin/kafka-topics.sh --zookeeper 
localhost:2181 --create --topic testNewA --partitions 4 --replication-factor 3 
--config min.insync.replicas=-1
Error while executing topic command Wrong value -1 of min.insync.replicas in 
Topic configuration;  Valid values are at least 1
kafka.common.InvalidConfigException: Wrong value -1 of min.insync.replicas in 
Topic configuration;  Valid values are at least 1
at kafka.log.LogConfig$.validateMinInSyncReplicas(LogConfig.scala:191)
at kafka.log.LogConfig$.validate(LogConfig.scala:179)
at 
kafka.admin.TopicCommand$.parseTopicConfigsToBeAdded(TopicCommand.scala:204)
at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:84)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:54)
at kafka.admin.TopicCommand.main(TopicCommand.scala)

root@precise64:/opt/apache/kafka# bin/kafka-topics.sh --zookeeper 
localhost:2181 --create --topic testNewA --partitions 4 --replication-factor 3 
--config min.insync.replicas=4 
Error while executing topic command replication factor: 3 larger than available 
brokers: 1
kafka.admin.AdminOperationException: replication factor: 3 larger than 
available brokers: 1
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171)
at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:92)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:54)
at kafka.admin.TopicCommand.main(TopicCommand.scala)

{code}

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Gwen Shapira
 Fix For: 0.8.2

 Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, 
 KAFKA-1555.2.patch, KAFKA-1555.3.patch


 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR.