[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use

2014-10-28 Thread Chris Cope (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186564#comment-14186564
 ] 

Chris Cope commented on KAFKA-1501:
---

14/200 test runs failed with the java.net.BindException: 
java.net.BindException: Address already in use errors. This bug is rough.

 transient unit tests failures due to port already in use
 

 Key: KAFKA-1501
 URL: https://issues.apache.org/jira/browse/KAFKA-1501
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jun Rao
Assignee: Guozhang Wang
  Labels: newbie
 Attachments: KAFKA-1501.patch, KAFKA-1501.patch


 Saw the following transient failures.
 kafka.api.ProducerFailureHandlingTest  testTooLargeRecordWithAckOne FAILED
 kafka.common.KafkaException: Socket server failed to bind to 
 localhost:59909: Address already in use.
 at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195)
 at kafka.network.Acceptor.init(SocketServer.scala:141)
 at kafka.network.SocketServer.startup(SocketServer.scala:68)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:95)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:123)
 at 
 kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use

2014-10-28 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186879#comment-14186879
 ] 

Jay Kreps commented on KAFKA-1501:
--

It might be worth checking out that stack overflow article. It has several 
heuristics that it recommends.

 transient unit tests failures due to port already in use
 

 Key: KAFKA-1501
 URL: https://issues.apache.org/jira/browse/KAFKA-1501
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jun Rao
Assignee: Guozhang Wang
  Labels: newbie
 Attachments: KAFKA-1501.patch, KAFKA-1501.patch


 Saw the following transient failures.
 kafka.api.ProducerFailureHandlingTest  testTooLargeRecordWithAckOne FAILED
 kafka.common.KafkaException: Socket server failed to bind to 
 localhost:59909: Address already in use.
 at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195)
 at kafka.network.Acceptor.init(SocketServer.scala:141)
 at kafka.network.SocketServer.startup(SocketServer.scala:68)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:95)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:123)
 at 
 kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-10-28 Thread Joe Stein
I heard back from INFRA already. Here is what they said.

This issue is already addressed in a different JIRA ticket. We expect to
resolve this ongoing issue within a few weeks. In the meantime, please
expect delays on deploying changes to your web site.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/

On Tue, Oct 28, 2014 at 10:20 AM, Joe Stein joe.st...@stealth.ly wrote:

 I am going to open a ticket with INFRA because I don't think it is a local
 cache issue. I get different results when I go to the page in different
 browsers on different computers, it is weird.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Mon, Oct 27, 2014 at 9:07 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Yeah it must be a caching thing because others in the same office do see
 it
 (but not all). And ctrl-shift-r doesn't seem to help. Nevermind :-)

 -Jay

 On Mon, Oct 27, 2014 at 6:00 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  Strange. I'm seeing it.
 
  Browser cache?
 
  On Mon, Oct 27, 2014 at 5:59 PM, Jay Kreps jay.kr...@gmail.com wrote:
   I actually don't see the beta release on that download page:
   http://kafka.apache.org/downloads.html
  
   -Jay
  
   On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein joest...@apache.org
 wrote:
  
   The Apache Kafka community is pleased to announce the beta release
 for
   Apache Kafka 0.8.2.
  
   The 0.8.2-beta release introduces many new features, improvements and
   fixes including:
- A new Java producer for ease of implementation and enhanced
  performance.
- Delete topic support.
- Per topic configuration of preference for consistency over
  availability.
- Scala 2.11 support and dropping support for Scala 2.8.
- LZ4 Compression.
  
   All of the changes in this release can be found:
   https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
  
   Apache Kafka is high-throughput, publish-subscribe messaging system
   rethought of as a distributed commit log.
  
   ** Fast = A single Kafka broker can handle hundreds of megabytes of
  reads
   and
   writes per second from thousands of clients.
  
   ** Scalable = Kafka is designed to allow a single cluster to serve
 as
  the
   central data backbone
   for a large organization. It can be elastically and transparently
  expanded
   without downtime.
   Data streams are partitioned and spread over a cluster of machines to
   allow data streams
   larger than the capability of any single machine and to allow
 clusters
  of
   co-ordinated consumers.
  
   ** Durable = Messages are persisted on disk and replicated within
 the
   cluster to prevent
   data loss. Each broker can handle terabytes of messages without
   performance impact.
  
   ** Distributed by Design = Kafka has a modern cluster-centric design
  that
   offers
   strong durability and fault-tolerance guarantees.
  
   You can download the release from:
  http://kafka.apache.org/downloads.html
  
   We welcome your help and feedback. For more information on how to
   report problems, and to get involved, visit the project website at
   http://kafka.apache.org/
  
  
 





Re: Review Request 27124: Patch for KAFKA-1721

2014-10-28 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27124/
---

(Updated Oct. 28, 2014, 4:25 p.m.)


Review request for kafka.


Bugs: KAFKA-1721
https://issues.apache.org/jira/browse/KAFKA-1721


Repository: kafka


Description (updated)
---

KAFKA-1721 Update snappy-java to 1.1.1.6 because 1.1.1.5 introduced a memory 
leak.


Diffs (updated)
-

  build.gradle c3e6bb839ad65c512c9db4695d2bb49b82c80da5 

Diff: https://reviews.apache.org/r/27124/diff/


Testing
---


Thanks,

Ewen Cheslack-Postava



[jira] [Updated] (KAFKA-1721) Snappy compressor is not thread safe

2014-10-28 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1721:
-
Attachment: KAFKA-1721_2014-10-28_09:25:50.patch

 Snappy compressor is not thread safe
 

 Key: KAFKA-1721
 URL: https://issues.apache.org/jira/browse/KAFKA-1721
 Project: Kafka
  Issue Type: Bug
  Components: compression
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Attachments: KAFKA-1721.patch, KAFKA-1721_2014-10-28_09:25:50.patch


 From the mailing list, it can generate this exception:
 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
 kafka producer I/O thread:
 *java.lang.NullPointerException*
 at
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
 at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
 at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
 at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
 at
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
 at
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 This appears to be an issue with the snappy-java library using ThreadLocal 
 for an internal buffer recycling object which results in that object being 
 shared unsafely across threads if one thread sends to multiple producers:
 {quote}
 I think the issue is that you're
 using all your producers across a thread pool and the snappy library
 uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
 they may be allocated from the same thread (e.g. one of your MyProducer
 classes calls Producer.send() on multiple producers from the same
 thread) and therefore use the same BufferRecycler. Eventually you hit
 the code in the stacktrace, and if two producer send threads hit it
 concurrently they improperly share the unsynchronized BufferRecycler.
 This seems like a pain to fix -- it's really a deficiency of the snappy
 library and as far as I can see there's no external control over
 BufferRecycler in their API. One possibility is to record the thread ID
 when we generate a new stream in Compressor and use that to synchronize
 access to ensure no concurrent BufferRecycler access. That could be made
 specific to snappy so it wouldn't impact other codecs. Not exactly
 ideal, but it would work. Unfortunately I can't think of any way for you
 to protect against this in your own code since the problem arises in the
 producer send thread, which your code should never know about.
 Another option would be to setup your producers differently to avoid the
 possibility of unsynchronized access from multiple threads (i.e. don't
 use the same thread pool approach), but whether you can do that will
 depend on your use case.
 {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe

2014-10-28 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187033#comment-14187033
 ] 

Ewen Cheslack-Postava commented on KAFKA-1721:
--

Updated reviewboard https://reviews.apache.org/r/27124/diff/
 against branch origin/trunk

 Snappy compressor is not thread safe
 

 Key: KAFKA-1721
 URL: https://issues.apache.org/jira/browse/KAFKA-1721
 Project: Kafka
  Issue Type: Bug
  Components: compression
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Attachments: KAFKA-1721.patch, KAFKA-1721_2014-10-28_09:25:50.patch


 From the mailing list, it can generate this exception:
 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
 kafka producer I/O thread:
 *java.lang.NullPointerException*
 at
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
 at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
 at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
 at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
 at
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
 at
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 This appears to be an issue with the snappy-java library using ThreadLocal 
 for an internal buffer recycling object which results in that object being 
 shared unsafely across threads if one thread sends to multiple producers:
 {quote}
 I think the issue is that you're
 using all your producers across a thread pool and the snappy library
 uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
 they may be allocated from the same thread (e.g. one of your MyProducer
 classes calls Producer.send() on multiple producers from the same
 thread) and therefore use the same BufferRecycler. Eventually you hit
 the code in the stacktrace, and if two producer send threads hit it
 concurrently they improperly share the unsynchronized BufferRecycler.
 This seems like a pain to fix -- it's really a deficiency of the snappy
 library and as far as I can see there's no external control over
 BufferRecycler in their API. One possibility is to record the thread ID
 when we generate a new stream in Compressor and use that to synchronize
 access to ensure no concurrent BufferRecycler access. That could be made
 specific to snappy so it wouldn't impact other codecs. Not exactly
 ideal, but it would work. Unfortunately I can't think of any way for you
 to protect against this in your own code since the problem arises in the
 producer send thread, which your code should never know about.
 Another option would be to setup your producers differently to avoid the
 possibility of unsynchronized access from multiple threads (i.e. don't
 use the same thread pool approach), but whether you can do that will
 depend on your use case.
 {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Setting up Kafka and Intellij

2014-10-28 Thread Renato Marroquín Mogrovejo
Hi guys,

I have struggling through the day to set Kafka trunk into Intellij. I mean
I can get the code by running ./gradlew idea, and then opening the project.
But I am trying to actually run the tests from inside Intellij but I have
many dependency problems, and I guess adding the whole .m2 repo into the
classpath is not the solution.
I found quite some topics on setting this up, but not a definite answer. So
could anybody share their experience setting Kafka with Intellij for
actually executing the tests?
Many thanks in advance!


Renato M.


Re: Review Request 27256: Fix KAFKA-1735

2014-10-28 Thread Guozhang Wang


 On Oct. 28, 2014, 1:17 a.m., Neha Narkhede wrote:
  clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, 
  line 207
  https://reviews.apache.org/r/27256/diff/1/?file=734746#file734746line207
 
  Would it be possible to add a unit test for this?

Sure. This scenario is covered for the old consumer in 
ZookeeperConsumerCompressionTest, and I originally plan to migrate to the new 
consumer unit test in the future; thinking it twice I now feel better to add it 
to the MemoryRecordsTest.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27256/#review58744
---


On Oct. 27, 2014, 11:59 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27256/
 ---
 
 (Updated Oct. 27, 2014, 11:59 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1735
 https://issues.apache.org/jira/browse/KAFKA-1735
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Handle partial reads from compressed stream
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca 
 
 Diff: https://reviews.apache.org/r/27256/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use

2014-10-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187104#comment-14187104
 ] 

Guozhang Wang commented on KAFKA-1501:
--

Ouch... Could you share the stack trace on some of the failed tests? Are they 
all from SocketServer?

From the JavaDocs and 
http://stackoverflow.com/questions/434718/sockets-discover-port-availability-using-java
 it seems setting SO_RESUEADDR has no negative effect, all it does is that if 
a port is bound to a socket with SO_RESUEADDR enabled and is in the 
WAIT_TIMEOUT state, another socket with SO_RESUEADDR enabled can bind to this 
port (i.s. reusing it).

 transient unit tests failures due to port already in use
 

 Key: KAFKA-1501
 URL: https://issues.apache.org/jira/browse/KAFKA-1501
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jun Rao
Assignee: Guozhang Wang
  Labels: newbie
 Attachments: KAFKA-1501.patch, KAFKA-1501.patch


 Saw the following transient failures.
 kafka.api.ProducerFailureHandlingTest  testTooLargeRecordWithAckOne FAILED
 kafka.common.KafkaException: Socket server failed to bind to 
 localhost:59909: Address already in use.
 at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195)
 at kafka.network.Acceptor.init(SocketServer.scala:141)
 at kafka.network.SocketServer.startup(SocketServer.scala:68)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:95)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:123)
 at 
 kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1583) Kafka API Refactoring

2014-10-28 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187118#comment-14187118
 ] 

Joel Koshy commented on KAFKA-1583:
---

Thanks for the updated patch. I will do this today. [~junrao] do you also want 
to take another look?

 Kafka API Refactoring
 -

 Key: KAFKA-1583
 URL: https://issues.apache.org/jira/browse/KAFKA-1583
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, 
 KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, 
 KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, 
 KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, 
 KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch, 
 KAFKA-1583_2014-10-17_09:56:33.patch, KAFKA-1583_2014-10-22_18:52:52.patch


 This is the next step of KAFKA-1430. Details can be found at this page:
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1638) transient unit test failure UncleanLeaderElectionTest

2014-10-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-1638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187138#comment-14187138
 ] 

Renato Javier Marroquín Mogrovejo commented on KAFKA-1638:
--

Hi, 

I have been trying to reproduce this test failure but I haven't been able to do 
so. I am running the test from inside Intellij, maybe that is why? Do you have 
a way to make the test failure reproducible? I would be happy to look at it.
Thanks!


 transient unit test failure UncleanLeaderElectionTest
 -

 Key: KAFKA-1638
 URL: https://issues.apache.org/jira/browse/KAFKA-1638
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie++

 Saw the following transient unit test failure.
 kafka.integration.UncleanLeaderElectionTest  
 testUncleanLeaderElectionEnabled FAILED
 java.lang.RuntimeException: A broker is already registered on the path 
 /brokers/ids/1. This probably indicates that you either have configured a 
 brokerid that is already in use, or else you have shutdown this broker and 
 restarted it faster than the zookeeper timeout so it appears to be 
 re-registering.
 at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:174)
 at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:63)
 at kafka.server.KafkaHealthcheck.startup(KafkaHealthcheck.scala:45)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:121)
 at 
 kafka.integration.UncleanLeaderElectionTest$$anonfun$verifyUncleanLeaderElectionEnabled$8.apply(UncleanLeaderElectionTest.scala:187)
 at 
 kafka.integration.UncleanLeaderElectionTest$$anonfun$verifyUncleanLeaderElectionEnabled$8.apply(UncleanLeaderElectionTest.scala:187)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.integration.UncleanLeaderElectionTest.verifyUncleanLeaderElectionEnabled(UncleanLeaderElectionTest.scala:187)
 at 
 kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionEnabled(UncleanLeaderElectionTest.scala:106)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 26755: Patch for KAFKA-1706

2014-10-28 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26755/#review58831
---


Can we add a unit test as well?

- Joel Koshy


On Oct. 28, 2014, 1:34 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26755/
 ---
 
 (Updated Oct. 28, 2014, 1:34 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1706
 https://issues.apache.org/jira/browse/KAFKA-1706
 
 
 Repository: kafka
 
 
 Description
 ---
 
 changed arguments name
 
 
 correct typo.
 
 
 Incorporated Joel's comments. Also fixed negative queue size problem.
 
 
 Incorporated Joel's comments.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/26755/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 26373: Patch for KAFKA-1647

2014-10-28 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/26373/#review58846
---

Ship it!



core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/26373/#comment13

Sorry - this is fine. I missed the parantheses


- Joel Koshy


On Oct. 28, 2014, 12:20 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/26373/
 ---
 
 (Updated Oct. 28, 2014, 12:20 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1647
 https://issues.apache.org/jira/browse/KAFKA-1647
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Addressed Joel's comments.
 
 
 the version 2 code seems to be submitted by mistake... This should be the 
 code for review that addressed Joel's comments.
 
 
 Addressed Jun's comments. Will do tests to verify if it works.
 
 
 Addressed Joel's comments, we do not need to check the if leader exits for 
 not when adding fetcher.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
 
 Diff: https://reviews.apache.org/r/26373/diff/
 
 
 Testing
 ---
 
 Followed Joel's testing step. I was able to reproduce the problem without the 
 patch and the WARN message goes away after applied the patch.
 
 
 Thanks,
 
 Jiangjie Qin
 




High Level Consumer and Close with Auto Commit On

2014-10-28 Thread Bhavesh Mistry
Hi Kafka Team,

What is expected behavior when you close *ConsumerConnector* and auto
commit is on ?  Basically, when auto commit interval is set to 5 seconds
and shutdown is called (before 5 seconds elapses) does ConsumerConnector
commit the offset of message consumed by (next()) method or consumer will
get duplicate messages when it comes online after restart ?

ConsumerConnector.shutdown();

Thanks,

Bhavesh


Re: High Level Consumer and Close with Auto Commit On

2014-10-28 Thread Gwen Shapira
High level consumer commits before shutting down.

If you'll look at ZookeeperConsumerConnector.scala (currently the only
implementation of ConsumerConnector) you'll see shutdown() includes
the following:

  if (config.autoCommitEnable)
commitOffsets()

Gwen

On Tue, Oct 28, 2014 at 11:44 AM, Bhavesh Mistry
mistry.p.bhav...@gmail.com wrote:
 Hi Kafka Team,

 What is expected behavior when you close *ConsumerConnector* and auto
 commit is on ?  Basically, when auto commit interval is set to 5 seconds
 and shutdown is called (before 5 seconds elapses) does ConsumerConnector
 commit the offset of message consumed by (next()) method or consumer will
 get duplicate messages when it comes online after restart ?

 ConsumerConnector.shutdown();

 Thanks,

 Bhavesh


[jira] [Created] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes

2014-10-28 Thread Kyle Banker (JIRA)
Kyle Banker created KAFKA-1736:
--

 Summary: Improve parition-broker assignment strategy for better 
availaility in majority durability modes
 Key: KAFKA-1736
 URL: https://issues.apache.org/jira/browse/KAFKA-1736
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1.1
Reporter: Kyle Banker
Priority: Minor


The current random strategy of partition-to-broker distribution combined with a 
fairly typical use of min.isr and request.acks results in a suboptimal level of 
availability.

Specifically, if all of your topics have a replication factor of 3, and you use 
min.isr=2 and required.acks=all, then regardless of the number of the brokers 
in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 
95% of the time, result in the inability to write to at least one partition, 
thus rendering the cluster unavailable. As the total number of partitions 
increases, so does this probability.

On the other hand, if partitions are distributed so that brokers are 
effectively replicas of each other, then the probability of unavailability when 
two nodes are lost is significantly decreased. This probability continues to 
decrease as the size of the cluster increases and, more significantly, this 
probability is constant with respect to the total number of partitions. The 
only requirement for getting these numbers with this strategy is that the 
number of brokers be a multiple of the replication factor.

Here are of the results of some simulations I've run:

With Random Partition Assignment

Number of Brokers / Number of Partitions / Replication Factor / Probability 
that two randomly selected nodes will contain at least 1 of the same partitions

6  / 54 / 3 / .999

9  / 54 / 3 / .986

12 / 54 / 3 / .894


Broker-Replica-Style Partitioning

Number of Brokers / Number of Partitions / Replication Factor / Probability 
that two randomly selected nodes will contain at least 1 of the same partitions

6  / 54 / 3 / .424

9  / 54 / 3 / .228

12 / 54 / 3 / .168

Adopting this strategy will greatly increase availability for users wanting 
majority-style durability and should not change current behavior as long as 
leader partitions are assigned evenly. I don't know of any negative impact for 
other use cases, as in these cases, the distribution will still be effectively 
random.

Let me know if you'd like to see simulation code and whether a patch would be 
welcome.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes

2014-10-28 Thread Kyle Banker (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Banker updated KAFKA-1736:
---
Description: 
The current random strategy of partition-to-broker distribution combined with a 
fairly typical use of min.isr and request.acks results in a suboptimal level of 
availability.

Specifically, if all of your topics have a replication factor of 3, and you use 
min.isr=2 and required.acks=all, then regardless of the number of the brokers 
in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 
95% of the time, result in the inability to write to at least one partition, 
thus rendering the cluster unavailable. As the total number of partitions 
increases, so does this probability.

On the other hand, if partitions are distributed so that brokers are 
effectively replicas of each other, then the probability of unavailability when 
two nodes are lost is significantly decreased. This probability continues to 
decrease as the size of the cluster increases and, more significantly, this 
probability is constant with respect to the total number of partitions. The 
only requirement for getting these numbers with this strategy is that the 
number of brokers be a multiple of the replication factor.

Here are of the results of some simulations I've run:

With Random Partition Assignment

Number of Brokers / Number of Partitions / Replication Factor / Probability 
that two randomly selected nodes will contain at least 1 of the same partitions

6  / 54 / 3 / .999

9  / 54 / 3 / .986

12 / 54 / 3 / .894


Broker-Replica-Style Partitioning

Number of Brokers / Number of Partitions / Replication Factor / Probability 
that two randomly selected nodes will contain at least 1 of the same partitions

6  / 54 / 3 / .424

9  / 54 / 3 / .228

12 / 54 / 3 / .168

Adopting this strategy will greatly increase availability for users wanting 
majority-style durability and should not change current behavior as long as 
leader partitions are assigned evenly. I don't know of any negative impact for 
other use cases, as in these cases, the distribution will still be effectively 
random.

Let me know if you'd like to see simulation code and whether a patch would be 
welcome.

EDIT: Just to clarify, here's how the current partition assigner would assign 9 
partitions with 3 replicas each to a 9-node cluster (broker number - set of 
replicas).

0 = Some(List(2, 3, 4))
1 = Some(List(3, 4, 5))
2 = Some(List(4, 5, 6))
3 = Some(List(5, 6, 7))
4 = Some(List(6, 7, 8))
5 = Some(List(7, 8, 9))
6 = Some(List(8, 9, 1))
7 = Some(List(9, 1, 2))
8 = Some(List(1, 2, 3))

Here's how I'm proposing they be assigned:

0 = Some(ArrayBuffer(8, 5, 2))
1 = Some(ArrayBuffer(8, 5, 2))
2 = Some(ArrayBuffer(8, 5, 2))
3 = Some(ArrayBuffer(7, 4, 1))
4 = Some(ArrayBuffer(7, 4, 1))
5 = Some(ArrayBuffer(7, 4, 1))
6 = Some(ArrayBuffer(6, 3, 0))
7 = Some(ArrayBuffer(6, 3, 0))
8 = Some(ArrayBuffer(6, 3, 0))


  was:
The current random strategy of partition-to-broker distribution combined with a 
fairly typical use of min.isr and request.acks results in a suboptimal level of 
availability.

Specifically, if all of your topics have a replication factor of 3, and you use 
min.isr=2 and required.acks=all, then regardless of the number of the brokers 
in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 
95% of the time, result in the inability to write to at least one partition, 
thus rendering the cluster unavailable. As the total number of partitions 
increases, so does this probability.

On the other hand, if partitions are distributed so that brokers are 
effectively replicas of each other, then the probability of unavailability when 
two nodes are lost is significantly decreased. This probability continues to 
decrease as the size of the cluster increases and, more significantly, this 
probability is constant with respect to the total number of partitions. The 
only requirement for getting these numbers with this strategy is that the 
number of brokers be a multiple of the replication factor.

Here are of the results of some simulations I've run:

With Random Partition Assignment

Number of Brokers / Number of Partitions / Replication Factor / Probability 
that two randomly selected nodes will contain at least 1 of the same partitions

6  / 54 / 3 / .999

9  / 54 / 3 / .986

12 / 54 / 3 / .894


Broker-Replica-Style Partitioning

Number of Brokers / Number of Partitions / Replication Factor / Probability 
that two randomly selected nodes will contain at least 1 of the same partitions

6  / 54 / 3 / .424

9  / 54 / 3 / .228

12 / 54 / 3 / .168

Adopting this strategy will greatly increase availability for users wanting 
majority-style durability and should not change current behavior as long as 
leader partitions are assigned evenly. I don't know of any negative impact for 
other use cases, as in these cases, the 

Re: Review Request 24676: Fix KAFKA-1583

2014-10-28 Thread Joel Koshy


 On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/ReplicaManager.scala, line 120
  https://reviews.apache.org/r/24676/diff/11/?file=724376#file724376line120
 
  (for regular consumer fetch)
 
 Guozhang Wang wrote:
 Actually this is for both consumer / follower fetch

The follower fetches are not going to be blocked based on hw. i.e., available 
bytes for follower fetches is computed up to log end offset.


 On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/ReplicaManager.scala, line 265
  https://reviews.apache.org/r/24676/diff/11/?file=724376#file724376line265
 
  This is old code and we don't need to address it in this patch, but I 
  was wondering if it makes sense to respond sooner if there is at least one 
  error in the local append. What do you think? i.e., I don't remember a good 
  reason for holding on to the request if there are i  numPartitions errors 
  in local append.
 
 Guozhang Wang wrote:
 I think today we are already responding immediately after a failure in 
 local append, right?

Yeah that was my question: from the code above it does not seem to be the case. 
If so, could you file a jira to fix that?


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review57235
---


On Oct. 23, 2014, 1:53 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/24676/
 ---
 
 (Updated Oct. 23, 2014, 1:53 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1583
 https://issues.apache.org/jira/browse/KAFKA-1583
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incoporate Joel's comments after rebase
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 59c09155dd25fad7bed07d3d00039e3dc66db95c 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
   core/src/main/scala/kafka/api/ProducerResponse.scala 
 a286272c834b6f40164999ff8b7f8998875f2cfe 
   core/src/main/scala/kafka/cluster/Partition.scala 
 e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
   core/src/main/scala/kafka/common/ErrorMapping.scala 
 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
   core/src/main/scala/kafka/log/Log.scala 
 157d67369baabd2206a2356b2aa421e848adab17 
   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
 a624359fb2059340bb8dc1619c5b5f226e26eb9b 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
 ed1318891253556cdf4d908033b704495acd5724 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
 d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
   core/src/main/scala/kafka/server/RequestPurgatory.scala 
 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
   core/src/main/scala/kafka/utils/DelayedItem.scala 
 d7276494072f14f1cdf7d23f755ac32678c5675c 
   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
 fb61d552f2320fedec547400fbbe402a0b2f5d87 
   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
 03a424d45215e1e7780567d9559dae4d0ae6fc29 
   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
 cd302aa51eb8377d88b752d48274e403926439f2 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 a9c4ddc78df0b3695a77a12cf8cf25521a203122 
   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
 a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 3804a114e97c849cae48308997037786614173fc 
   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
 
 Diff: https://reviews.apache.org/r/24676/diff/
 
 
 Testing
 ---
 
 Unit tests
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 24676: Fix KAFKA-1583

2014-10-28 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review58865
---

Ship it!


Looks good - just minor comments which we can fix on check-in.


core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/24676/#comment100026

Is there a benefit in using mapValues (vs map) here since we aren't doing 
any transformation per se. i.e., we anyway need to materialize these.



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/24676/#comment100018

minor comment: since we are returning metadatatoolarge, 
validateOffsetMetadata is an odd name. Could just rename it to 
validateOffsetMetadataLength - we can take care of that on check-in


- Joel Koshy


On Oct. 23, 2014, 1:53 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/24676/
 ---
 
 (Updated Oct. 23, 2014, 1:53 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1583
 https://issues.apache.org/jira/browse/KAFKA-1583
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incoporate Joel's comments after rebase
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 59c09155dd25fad7bed07d3d00039e3dc66db95c 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
   core/src/main/scala/kafka/api/ProducerResponse.scala 
 a286272c834b6f40164999ff8b7f8998875f2cfe 
   core/src/main/scala/kafka/cluster/Partition.scala 
 e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
   core/src/main/scala/kafka/common/ErrorMapping.scala 
 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
   core/src/main/scala/kafka/log/Log.scala 
 157d67369baabd2206a2356b2aa421e848adab17 
   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
 a624359fb2059340bb8dc1619c5b5f226e26eb9b 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 e0f14e25af03e6d4344386dcabc1457ee784d345 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 9481508fc2d6140b36829840c337e557f3d090da 
   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
 ed1318891253556cdf4d908033b704495acd5724 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
 d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 78b7514cc109547c562e635824684fad581af653 
   core/src/main/scala/kafka/server/RequestPurgatory.scala 
 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
   core/src/main/scala/kafka/utils/DelayedItem.scala 
 d7276494072f14f1cdf7d23f755ac32678c5675c 
   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
 fb61d552f2320fedec547400fbbe402a0b2f5d87 
   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
 03a424d45215e1e7780567d9559dae4d0ae6fc29 
   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
 cd302aa51eb8377d88b752d48274e403926439f2 
   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
 a9c4ddc78df0b3695a77a12cf8cf25521a203122 
   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
 a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 3804a114e97c849cae48308997037786614173fc 
   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
 
 Diff: https://reviews.apache.org/r/24676/diff/
 
 
 Testing
 ---
 
 Unit tests
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 24676: Fix KAFKA-1583

2014-10-28 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
---

(Updated Oct. 28, 2014, 10:09 p.m.)


Review request for kafka.


Bugs: KAFKA-1583
https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
---

Incoporated Joel's comments round two


Diffs (updated)
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
59c09155dd25fad7bed07d3d00039e3dc66db95c 
  core/src/main/scala/kafka/api/FetchResponse.scala 
8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala 
e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 
880ab4a004f078e5d84446ea6e4454ecc06c95f2 
  core/src/main/scala/kafka/log/Log.scala 
157d67369baabd2206a2356b2aa421e848adab17 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala 
ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 
85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
  core/src/main/scala/kafka/server/OffsetManager.scala 
43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala 
d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
9d76234bc2c810ec08621dc92bb4061b8e7cd993 
  core/src/main/scala/kafka/utils/DelayedItem.scala 
d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
fb61d552f2320fedec547400fbbe402a0b2f5d87 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
3804a114e97c849cae48308997037786614173fc 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
---

Unit tests


Thanks,

Guozhang Wang



[jira] [Updated] (KAFKA-1583) Kafka API Refactoring

2014-10-28 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1583:
-
Attachment: KAFKA-1583_2014-10-28_15:09:30.patch

 Kafka API Refactoring
 -

 Key: KAFKA-1583
 URL: https://issues.apache.org/jira/browse/KAFKA-1583
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, 
 KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, 
 KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, 
 KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, 
 KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch, 
 KAFKA-1583_2014-10-17_09:56:33.patch, KAFKA-1583_2014-10-22_18:52:52.patch, 
 KAFKA-1583_2014-10-28_15:09:30.patch


 This is the next step of KAFKA-1430. Details can be found at this page:
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1583) Kafka API Refactoring

2014-10-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187571#comment-14187571
 ] 

Guozhang Wang commented on KAFKA-1583:
--

Updated reviewboard https://reviews.apache.org/r/24676/diff/
 against branch origin/trunk

 Kafka API Refactoring
 -

 Key: KAFKA-1583
 URL: https://issues.apache.org/jira/browse/KAFKA-1583
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, 
 KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, 
 KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, 
 KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, 
 KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch, 
 KAFKA-1583_2014-10-17_09:56:33.patch, KAFKA-1583_2014-10-22_18:52:52.patch, 
 KAFKA-1583_2014-10-28_15:09:30.patch


 This is the next step of KAFKA-1430. Details can be found at this page:
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes

2014-10-28 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187701#comment-14187701
 ] 

Jay Kreps commented on KAFKA-1736:
--

Yes, this is correct. This is a common tradeoff in this type of partitioned 
system. The math works for availability if you lose more than min.isr or data 
loss if you lose more than the replication factor. It actually also applies to 
multi-tenancy problems, e.g. if you have a crazy producer overloading one 
topic, how many other topics are impacted?

If you do random placement then any time you lose 3 nodes you will likely have 
data loss in at least on partition. However identical node-node replication is 
no panacea either. If you have identical replicas then if you lose 3 nodes you 
will probably lose no partitions, however if you lose a partition you will 
probably lose all partitions on that machine. I believe, though I haven't done 
the math, that the expected total data loss is the same either way but in one 
mode the probability of some data loss is high and the probability of 
large-scale loss low; in the other extreme the probability of some data loss is 
low, but the probability of total loss comparatively high.

Another problem with your proposal is that replication is set per-topic, so the 
placement you describe is only possible if all topics have the same replication 
factor.

However these two extremes are not the only options. A generalization of the 
extremes would be divide the N machines in the cluster arbitrarily into C 
clumps and attempt to place the partitions for a given topic entirely in a 
single clump. If C=1 you get the current strategy, if C=N/3 you get your 
strategy for replication factor 3; however any C should be possible, balances 
between these extremes, and doesn't depend on a single replication factor 
across the cluster.

 Improve parition-broker assignment strategy for better availaility in 
 majority durability modes
 ---

 Key: KAFKA-1736
 URL: https://issues.apache.org/jira/browse/KAFKA-1736
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1.1
Reporter: Kyle Banker
Priority: Minor

 The current random strategy of partition-to-broker distribution combined with 
 a fairly typical use of min.isr and request.acks results in a suboptimal 
 level of availability.
 Specifically, if all of your topics have a replication factor of 3, and you 
 use min.isr=2 and required.acks=all, then regardless of the number of the 
 brokers in the cluster, you can safely lose only 1 node. Losing more than 1 
 node will, 95% of the time, result in the inability to write to at least one 
 partition, thus rendering the cluster unavailable. As the total number of 
 partitions increases, so does this probability.
 On the other hand, if partitions are distributed so that brokers are 
 effectively replicas of each other, then the probability of unavailability 
 when two nodes are lost is significantly decreased. This probability 
 continues to decrease as the size of the cluster increases and, more 
 significantly, this probability is constant with respect to the total number 
 of partitions. The only requirement for getting these numbers with this 
 strategy is that the number of brokers be a multiple of the replication 
 factor.
 Here are of the results of some simulations I've run:
 With Random Partition Assignment
 Number of Brokers / Number of Partitions / Replication Factor / Probability 
 that two randomly selected nodes will contain at least 1 of the same 
 partitions
 6  / 54 / 3 / .999
 9  / 54 / 3 / .986
 12 / 54 / 3 / .894
 Broker-Replica-Style Partitioning
 Number of Brokers / Number of Partitions / Replication Factor / Probability 
 that two randomly selected nodes will contain at least 1 of the same 
 partitions
 6  / 54 / 3 / .424
 9  / 54 / 3 / .228
 12 / 54 / 3 / .168
 Adopting this strategy will greatly increase availability for users wanting 
 majority-style durability and should not change current behavior as long as 
 leader partitions are assigned evenly. I don't know of any negative impact 
 for other use cases, as in these cases, the distribution will still be 
 effectively random.
 Let me know if you'd like to see simulation code and whether a patch would be 
 welcome.
 EDIT: Just to clarify, here's how the current partition assigner would assign 
 9 partitions with 3 replicas each to a 9-node cluster (broker number - set 
 of replicas).
 0 = Some(List(2, 3, 4))
 1 = Some(List(3, 4, 5))
 2 = Some(List(4, 5, 6))
 3 = Some(List(5, 6, 7))
 4 = Some(List(6, 7, 8))
 5 = Some(List(7, 8, 9))
 6 = Some(List(8, 9, 1))
 7 = Some(List(9, 1, 2))
 8 = Some(List(1, 2, 3))
 Here's how I'm proposing they be assigned:
 0 = Some(ArrayBuffer(8, 5, 2))
 1 = 

[jira] [Commented] (KAFKA-1583) Kafka API Refactoring

2014-10-28 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187960#comment-14187960
 ] 

Jun Rao commented on KAFKA-1583:


[~jjkoshy], you can commit the patch once you are done with the review.

 Kafka API Refactoring
 -

 Key: KAFKA-1583
 URL: https://issues.apache.org/jira/browse/KAFKA-1583
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0

 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, 
 KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, 
 KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, 
 KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, 
 KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch, 
 KAFKA-1583_2014-10-17_09:56:33.patch, KAFKA-1583_2014-10-22_18:52:52.patch, 
 KAFKA-1583_2014-10-28_15:09:30.patch


 This is the next step of KAFKA-1430. Details can be found at this page:
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)