[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use
[ https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186564#comment-14186564 ] Chris Cope commented on KAFKA-1501: --- 14/200 test runs failed with the java.net.BindException: java.net.BindException: Address already in use errors. This bug is rough. transient unit tests failures due to port already in use Key: KAFKA-1501 URL: https://issues.apache.org/jira/browse/KAFKA-1501 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Assignee: Guozhang Wang Labels: newbie Attachments: KAFKA-1501.patch, KAFKA-1501.patch Saw the following transient failures. kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne FAILED kafka.common.KafkaException: Socket server failed to bind to localhost:59909: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195) at kafka.network.Acceptor.init(SocketServer.scala:141) at kafka.network.SocketServer.startup(SocketServer.scala:68) at kafka.server.KafkaServer.startup(KafkaServer.scala:95) at kafka.utils.TestUtils$.createServer(TestUtils.scala:123) at kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use
[ https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14186879#comment-14186879 ] Jay Kreps commented on KAFKA-1501: -- It might be worth checking out that stack overflow article. It has several heuristics that it recommends. transient unit tests failures due to port already in use Key: KAFKA-1501 URL: https://issues.apache.org/jira/browse/KAFKA-1501 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Assignee: Guozhang Wang Labels: newbie Attachments: KAFKA-1501.patch, KAFKA-1501.patch Saw the following transient failures. kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne FAILED kafka.common.KafkaException: Socket server failed to bind to localhost:59909: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195) at kafka.network.Acceptor.init(SocketServer.scala:141) at kafka.network.SocketServer.startup(SocketServer.scala:68) at kafka.server.KafkaServer.startup(KafkaServer.scala:95) at kafka.utils.TestUtils$.createServer(TestUtils.scala:123) at kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
I heard back from INFRA already. Here is what they said. This issue is already addressed in a different JIRA ticket. We expect to resolve this ongoing issue within a few weeks. In the meantime, please expect delays on deploying changes to your web site. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Tue, Oct 28, 2014 at 10:20 AM, Joe Stein joe.st...@stealth.ly wrote: I am going to open a ticket with INFRA because I don't think it is a local cache issue. I get different results when I go to the page in different browsers on different computers, it is weird. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Oct 27, 2014 at 9:07 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah it must be a caching thing because others in the same office do see it (but not all). And ctrl-shift-r doesn't seem to help. Nevermind :-) -Jay On Mon, Oct 27, 2014 at 6:00 PM, Gwen Shapira gshap...@cloudera.com wrote: Strange. I'm seeing it. Browser cache? On Mon, Oct 27, 2014 at 5:59 PM, Jay Kreps jay.kr...@gmail.com wrote: I actually don't see the beta release on that download page: http://kafka.apache.org/downloads.html -Jay On Mon, Oct 27, 2014 at 5:50 PM, Joe Stein joest...@apache.org wrote: The Apache Kafka community is pleased to announce the beta release for Apache Kafka 0.8.2. The 0.8.2-beta release introduces many new features, improvements and fixes including: - A new Java producer for ease of implementation and enhanced performance. - Delete topic support. - Per topic configuration of preference for consistency over availability. - Scala 2.11 support and dropping support for Scala 2.8. - LZ4 Compression. All of the changes in this release can be found: https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html Apache Kafka is high-throughput, publish-subscribe messaging system rethought of as a distributed commit log. ** Fast = A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients. ** Scalable = Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers. ** Durable = Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact. ** Distributed by Design = Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees. You can download the release from: http://kafka.apache.org/downloads.html We welcome your help and feedback. For more information on how to report problems, and to get involved, visit the project website at http://kafka.apache.org/
Re: Review Request 27124: Patch for KAFKA-1721
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27124/ --- (Updated Oct. 28, 2014, 4:25 p.m.) Review request for kafka. Bugs: KAFKA-1721 https://issues.apache.org/jira/browse/KAFKA-1721 Repository: kafka Description (updated) --- KAFKA-1721 Update snappy-java to 1.1.1.6 because 1.1.1.5 introduced a memory leak. Diffs (updated) - build.gradle c3e6bb839ad65c512c9db4695d2bb49b82c80da5 Diff: https://reviews.apache.org/r/27124/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[jira] [Updated] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1721: - Attachment: KAFKA-1721_2014-10-28_09:25:50.patch Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1721.patch, KAFKA-1721_2014-10-28_09:25:50.patch From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1721) Snappy compressor is not thread safe
[ https://issues.apache.org/jira/browse/KAFKA-1721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187033#comment-14187033 ] Ewen Cheslack-Postava commented on KAFKA-1721: -- Updated reviewboard https://reviews.apache.org/r/27124/diff/ against branch origin/trunk Snappy compressor is not thread safe Key: KAFKA-1721 URL: https://issues.apache.org/jira/browse/KAFKA-1721 Project: Kafka Issue Type: Bug Components: compression Reporter: Ewen Cheslack-Postava Assignee: Ewen Cheslack-Postava Attachments: KAFKA-1721.patch, KAFKA-1721_2014-10-28_09:25:50.patch From the mailing list, it can generate this exception: 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka producer I/O thread: *java.lang.NullPointerException* at org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153) at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.kafka.common.record.Compressor.close(Compressor.java:94) at org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119) at org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) This appears to be an issue with the snappy-java library using ThreadLocal for an internal buffer recycling object which results in that object being shared unsafely across threads if one thread sends to multiple producers: {quote} I think the issue is that you're using all your producers across a thread pool and the snappy library uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated, they may be allocated from the same thread (e.g. one of your MyProducer classes calls Producer.send() on multiple producers from the same thread) and therefore use the same BufferRecycler. Eventually you hit the code in the stacktrace, and if two producer send threads hit it concurrently they improperly share the unsynchronized BufferRecycler. This seems like a pain to fix -- it's really a deficiency of the snappy library and as far as I can see there's no external control over BufferRecycler in their API. One possibility is to record the thread ID when we generate a new stream in Compressor and use that to synchronize access to ensure no concurrent BufferRecycler access. That could be made specific to snappy so it wouldn't impact other codecs. Not exactly ideal, but it would work. Unfortunately I can't think of any way for you to protect against this in your own code since the problem arises in the producer send thread, which your code should never know about. Another option would be to setup your producers differently to avoid the possibility of unsynchronized access from multiple threads (i.e. don't use the same thread pool approach), but whether you can do that will depend on your use case. {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Setting up Kafka and Intellij
Hi guys, I have struggling through the day to set Kafka trunk into Intellij. I mean I can get the code by running ./gradlew idea, and then opening the project. But I am trying to actually run the tests from inside Intellij but I have many dependency problems, and I guess adding the whole .m2 repo into the classpath is not the solution. I found quite some topics on setting this up, but not a definite answer. So could anybody share their experience setting Kafka with Intellij for actually executing the tests? Many thanks in advance! Renato M.
Re: Review Request 27256: Fix KAFKA-1735
On Oct. 28, 2014, 1:17 a.m., Neha Narkhede wrote: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, line 207 https://reviews.apache.org/r/27256/diff/1/?file=734746#file734746line207 Would it be possible to add a unit test for this? Sure. This scenario is covered for the old consumer in ZookeeperConsumerCompressionTest, and I originally plan to migrate to the new consumer unit test in the future; thinking it twice I now feel better to add it to the MemoryRecordsTest. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27256/#review58744 --- On Oct. 27, 2014, 11:59 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27256/ --- (Updated Oct. 27, 2014, 11:59 p.m.) Review request for kafka. Bugs: KAFKA-1735 https://issues.apache.org/jira/browse/KAFKA-1735 Repository: kafka Description --- Handle partial reads from compressed stream Diffs - clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca Diff: https://reviews.apache.org/r/27256/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use
[ https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187104#comment-14187104 ] Guozhang Wang commented on KAFKA-1501: -- Ouch... Could you share the stack trace on some of the failed tests? Are they all from SocketServer? From the JavaDocs and http://stackoverflow.com/questions/434718/sockets-discover-port-availability-using-java it seems setting SO_RESUEADDR has no negative effect, all it does is that if a port is bound to a socket with SO_RESUEADDR enabled and is in the WAIT_TIMEOUT state, another socket with SO_RESUEADDR enabled can bind to this port (i.s. reusing it). transient unit tests failures due to port already in use Key: KAFKA-1501 URL: https://issues.apache.org/jira/browse/KAFKA-1501 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Assignee: Guozhang Wang Labels: newbie Attachments: KAFKA-1501.patch, KAFKA-1501.patch Saw the following transient failures. kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne FAILED kafka.common.KafkaException: Socket server failed to bind to localhost:59909: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195) at kafka.network.Acceptor.init(SocketServer.scala:141) at kafka.network.SocketServer.startup(SocketServer.scala:68) at kafka.server.KafkaServer.startup(KafkaServer.scala:95) at kafka.utils.TestUtils$.createServer(TestUtils.scala:123) at kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1583) Kafka API Refactoring
[ https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187118#comment-14187118 ] Joel Koshy commented on KAFKA-1583: --- Thanks for the updated patch. I will do this today. [~junrao] do you also want to take another look? Kafka API Refactoring - Key: KAFKA-1583 URL: https://issues.apache.org/jira/browse/KAFKA-1583 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch, KAFKA-1583_2014-10-17_09:56:33.patch, KAFKA-1583_2014-10-22_18:52:52.patch This is the next step of KAFKA-1430. Details can be found at this page: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1638) transient unit test failure UncleanLeaderElectionTest
[ https://issues.apache.org/jira/browse/KAFKA-1638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187138#comment-14187138 ] Renato Javier Marroquín Mogrovejo commented on KAFKA-1638: -- Hi, I have been trying to reproduce this test failure but I haven't been able to do so. I am running the test from inside Intellij, maybe that is why? Do you have a way to make the test failure reproducible? I would be happy to look at it. Thanks! transient unit test failure UncleanLeaderElectionTest - Key: KAFKA-1638 URL: https://issues.apache.org/jira/browse/KAFKA-1638 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie++ Saw the following transient unit test failure. kafka.integration.UncleanLeaderElectionTest testUncleanLeaderElectionEnabled FAILED java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/1. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering. at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:174) at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:63) at kafka.server.KafkaHealthcheck.startup(KafkaHealthcheck.scala:45) at kafka.server.KafkaServer.startup(KafkaServer.scala:121) at kafka.integration.UncleanLeaderElectionTest$$anonfun$verifyUncleanLeaderElectionEnabled$8.apply(UncleanLeaderElectionTest.scala:187) at kafka.integration.UncleanLeaderElectionTest$$anonfun$verifyUncleanLeaderElectionEnabled$8.apply(UncleanLeaderElectionTest.scala:187) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.integration.UncleanLeaderElectionTest.verifyUncleanLeaderElectionEnabled(UncleanLeaderElectionTest.scala:187) at kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionEnabled(UncleanLeaderElectionTest.scala:106) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26755: Patch for KAFKA-1706
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/#review58831 --- Can we add a unit test as well? - Joel Koshy On Oct. 28, 2014, 1:34 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/ --- (Updated Oct. 28, 2014, 1:34 a.m.) Review request for kafka. Bugs: KAFKA-1706 https://issues.apache.org/jira/browse/KAFKA-1706 Repository: kafka Description --- changed arguments name correct typo. Incorporated Joel's comments. Also fixed negative queue size problem. Incorporated Joel's comments. Diffs - core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION Diff: https://reviews.apache.org/r/26755/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 26373: Patch for KAFKA-1647
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/#review58846 --- Ship it! core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/26373/#comment13 Sorry - this is fine. I missed the parantheses - Joel Koshy On Oct. 28, 2014, 12:20 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/ --- (Updated Oct. 28, 2014, 12:20 a.m.) Review request for kafka. Bugs: KAFKA-1647 https://issues.apache.org/jira/browse/KAFKA-1647 Repository: kafka Description --- Addressed Joel's comments. the version 2 code seems to be submitted by mistake... This should be the code for review that addressed Joel's comments. Addressed Jun's comments. Will do tests to verify if it works. Addressed Joel's comments, we do not need to check the if leader exits for not when adding fetcher. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 Diff: https://reviews.apache.org/r/26373/diff/ Testing --- Followed Joel's testing step. I was able to reproduce the problem without the patch and the WARN message goes away after applied the patch. Thanks, Jiangjie Qin
High Level Consumer and Close with Auto Commit On
Hi Kafka Team, What is expected behavior when you close *ConsumerConnector* and auto commit is on ? Basically, when auto commit interval is set to 5 seconds and shutdown is called (before 5 seconds elapses) does ConsumerConnector commit the offset of message consumed by (next()) method or consumer will get duplicate messages when it comes online after restart ? ConsumerConnector.shutdown(); Thanks, Bhavesh
Re: High Level Consumer and Close with Auto Commit On
High level consumer commits before shutting down. If you'll look at ZookeeperConsumerConnector.scala (currently the only implementation of ConsumerConnector) you'll see shutdown() includes the following: if (config.autoCommitEnable) commitOffsets() Gwen On Tue, Oct 28, 2014 at 11:44 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Team, What is expected behavior when you close *ConsumerConnector* and auto commit is on ? Basically, when auto commit interval is set to 5 seconds and shutdown is called (before 5 seconds elapses) does ConsumerConnector commit the offset of message consumed by (next()) method or consumer will get duplicate messages when it comes online after restart ? ConsumerConnector.shutdown(); Thanks, Bhavesh
[jira] [Created] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes
Kyle Banker created KAFKA-1736: -- Summary: Improve parition-broker assignment strategy for better availaility in majority durability modes Key: KAFKA-1736 URL: https://issues.apache.org/jira/browse/KAFKA-1736 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Kyle Banker Priority: Minor The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas of each other, then the probability of unavailability when two nodes are lost is significantly decreased. This probability continues to decrease as the size of the cluster increases and, more significantly, this probability is constant with respect to the total number of partitions. The only requirement for getting these numbers with this strategy is that the number of brokers be a multiple of the replication factor. Here are of the results of some simulations I've run: With Random Partition Assignment Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .999 9 / 54 / 3 / .986 12 / 54 / 3 / .894 Broker-Replica-Style Partitioning Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .424 9 / 54 / 3 / .228 12 / 54 / 3 / .168 Adopting this strategy will greatly increase availability for users wanting majority-style durability and should not change current behavior as long as leader partitions are assigned evenly. I don't know of any negative impact for other use cases, as in these cases, the distribution will still be effectively random. Let me know if you'd like to see simulation code and whether a patch would be welcome. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes
[ https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Banker updated KAFKA-1736: --- Description: The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas of each other, then the probability of unavailability when two nodes are lost is significantly decreased. This probability continues to decrease as the size of the cluster increases and, more significantly, this probability is constant with respect to the total number of partitions. The only requirement for getting these numbers with this strategy is that the number of brokers be a multiple of the replication factor. Here are of the results of some simulations I've run: With Random Partition Assignment Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .999 9 / 54 / 3 / .986 12 / 54 / 3 / .894 Broker-Replica-Style Partitioning Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .424 9 / 54 / 3 / .228 12 / 54 / 3 / .168 Adopting this strategy will greatly increase availability for users wanting majority-style durability and should not change current behavior as long as leader partitions are assigned evenly. I don't know of any negative impact for other use cases, as in these cases, the distribution will still be effectively random. Let me know if you'd like to see simulation code and whether a patch would be welcome. EDIT: Just to clarify, here's how the current partition assigner would assign 9 partitions with 3 replicas each to a 9-node cluster (broker number - set of replicas). 0 = Some(List(2, 3, 4)) 1 = Some(List(3, 4, 5)) 2 = Some(List(4, 5, 6)) 3 = Some(List(5, 6, 7)) 4 = Some(List(6, 7, 8)) 5 = Some(List(7, 8, 9)) 6 = Some(List(8, 9, 1)) 7 = Some(List(9, 1, 2)) 8 = Some(List(1, 2, 3)) Here's how I'm proposing they be assigned: 0 = Some(ArrayBuffer(8, 5, 2)) 1 = Some(ArrayBuffer(8, 5, 2)) 2 = Some(ArrayBuffer(8, 5, 2)) 3 = Some(ArrayBuffer(7, 4, 1)) 4 = Some(ArrayBuffer(7, 4, 1)) 5 = Some(ArrayBuffer(7, 4, 1)) 6 = Some(ArrayBuffer(6, 3, 0)) 7 = Some(ArrayBuffer(6, 3, 0)) 8 = Some(ArrayBuffer(6, 3, 0)) was: The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas of each other, then the probability of unavailability when two nodes are lost is significantly decreased. This probability continues to decrease as the size of the cluster increases and, more significantly, this probability is constant with respect to the total number of partitions. The only requirement for getting these numbers with this strategy is that the number of brokers be a multiple of the replication factor. Here are of the results of some simulations I've run: With Random Partition Assignment Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .999 9 / 54 / 3 / .986 12 / 54 / 3 / .894 Broker-Replica-Style Partitioning Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .424 9 / 54 / 3 / .228 12 / 54 / 3 / .168 Adopting this strategy will greatly increase availability for users wanting majority-style durability and should not change current behavior as long as leader partitions are assigned evenly. I don't know of any negative impact for other use cases, as in these cases, the
Re: Review Request 24676: Fix KAFKA-1583
On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote: core/src/main/scala/kafka/server/ReplicaManager.scala, line 120 https://reviews.apache.org/r/24676/diff/11/?file=724376#file724376line120 (for regular consumer fetch) Guozhang Wang wrote: Actually this is for both consumer / follower fetch The follower fetches are not going to be blocked based on hw. i.e., available bytes for follower fetches is computed up to log end offset. On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote: core/src/main/scala/kafka/server/ReplicaManager.scala, line 265 https://reviews.apache.org/r/24676/diff/11/?file=724376#file724376line265 This is old code and we don't need to address it in this patch, but I was wondering if it makes sense to respond sooner if there is at least one error in the local append. What do you think? i.e., I don't remember a good reason for holding on to the request if there are i numPartitions errors in local append. Guozhang Wang wrote: I think today we are already responding immediately after a failure in local append, right? Yeah that was my question: from the code above it does not seem to be the case. If so, could you file a jira to fix that? - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review57235 --- On Oct. 23, 2014, 1:53 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 23, 2014, 1:53 a.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Incoporate Joel's comments after rebase Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review58865 --- Ship it! Looks good - just minor comments which we can fix on check-in. core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/24676/#comment100026 Is there a benefit in using mapValues (vs map) here since we aren't doing any transformation per se. i.e., we anyway need to materialize these. core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/24676/#comment100018 minor comment: since we are returning metadatatoolarge, validateOffsetMetadata is an odd name. Could just rename it to validateOffsetMetadataLength - we can take care of that on check-in - Joel Koshy On Oct. 23, 2014, 1:53 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 23, 2014, 1:53 a.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Incoporate Joel's comments after rebase Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 28, 2014, 10:09 p.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- Incoporated Joel's comments round two Diffs (updated) - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
[jira] [Updated] (KAFKA-1583) Kafka API Refactoring
[ https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1583: - Attachment: KAFKA-1583_2014-10-28_15:09:30.patch Kafka API Refactoring - Key: KAFKA-1583 URL: https://issues.apache.org/jira/browse/KAFKA-1583 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch, KAFKA-1583_2014-10-17_09:56:33.patch, KAFKA-1583_2014-10-22_18:52:52.patch, KAFKA-1583_2014-10-28_15:09:30.patch This is the next step of KAFKA-1430. Details can be found at this page: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1583) Kafka API Refactoring
[ https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187571#comment-14187571 ] Guozhang Wang commented on KAFKA-1583: -- Updated reviewboard https://reviews.apache.org/r/24676/diff/ against branch origin/trunk Kafka API Refactoring - Key: KAFKA-1583 URL: https://issues.apache.org/jira/browse/KAFKA-1583 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch, KAFKA-1583_2014-10-17_09:56:33.patch, KAFKA-1583_2014-10-22_18:52:52.patch, KAFKA-1583_2014-10-28_15:09:30.patch This is the next step of KAFKA-1430. Details can be found at this page: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes
[ https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187701#comment-14187701 ] Jay Kreps commented on KAFKA-1736: -- Yes, this is correct. This is a common tradeoff in this type of partitioned system. The math works for availability if you lose more than min.isr or data loss if you lose more than the replication factor. It actually also applies to multi-tenancy problems, e.g. if you have a crazy producer overloading one topic, how many other topics are impacted? If you do random placement then any time you lose 3 nodes you will likely have data loss in at least on partition. However identical node-node replication is no panacea either. If you have identical replicas then if you lose 3 nodes you will probably lose no partitions, however if you lose a partition you will probably lose all partitions on that machine. I believe, though I haven't done the math, that the expected total data loss is the same either way but in one mode the probability of some data loss is high and the probability of large-scale loss low; in the other extreme the probability of some data loss is low, but the probability of total loss comparatively high. Another problem with your proposal is that replication is set per-topic, so the placement you describe is only possible if all topics have the same replication factor. However these two extremes are not the only options. A generalization of the extremes would be divide the N machines in the cluster arbitrarily into C clumps and attempt to place the partitions for a given topic entirely in a single clump. If C=1 you get the current strategy, if C=N/3 you get your strategy for replication factor 3; however any C should be possible, balances between these extremes, and doesn't depend on a single replication factor across the cluster. Improve parition-broker assignment strategy for better availaility in majority durability modes --- Key: KAFKA-1736 URL: https://issues.apache.org/jira/browse/KAFKA-1736 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Kyle Banker Priority: Minor The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas of each other, then the probability of unavailability when two nodes are lost is significantly decreased. This probability continues to decrease as the size of the cluster increases and, more significantly, this probability is constant with respect to the total number of partitions. The only requirement for getting these numbers with this strategy is that the number of brokers be a multiple of the replication factor. Here are of the results of some simulations I've run: With Random Partition Assignment Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .999 9 / 54 / 3 / .986 12 / 54 / 3 / .894 Broker-Replica-Style Partitioning Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .424 9 / 54 / 3 / .228 12 / 54 / 3 / .168 Adopting this strategy will greatly increase availability for users wanting majority-style durability and should not change current behavior as long as leader partitions are assigned evenly. I don't know of any negative impact for other use cases, as in these cases, the distribution will still be effectively random. Let me know if you'd like to see simulation code and whether a patch would be welcome. EDIT: Just to clarify, here's how the current partition assigner would assign 9 partitions with 3 replicas each to a 9-node cluster (broker number - set of replicas). 0 = Some(List(2, 3, 4)) 1 = Some(List(3, 4, 5)) 2 = Some(List(4, 5, 6)) 3 = Some(List(5, 6, 7)) 4 = Some(List(6, 7, 8)) 5 = Some(List(7, 8, 9)) 6 = Some(List(8, 9, 1)) 7 = Some(List(9, 1, 2)) 8 = Some(List(1, 2, 3)) Here's how I'm proposing they be assigned: 0 = Some(ArrayBuffer(8, 5, 2)) 1 =
[jira] [Commented] (KAFKA-1583) Kafka API Refactoring
[ https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14187960#comment-14187960 ] Jun Rao commented on KAFKA-1583: [~jjkoshy], you can commit the patch once you are done with the review. Kafka API Refactoring - Key: KAFKA-1583 URL: https://issues.apache.org/jira/browse/KAFKA-1583 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch, KAFKA-1583_2014-10-17_09:56:33.patch, KAFKA-1583_2014-10-22_18:52:52.patch, KAFKA-1583_2014-10-28_15:09:30.patch This is the next step of KAFKA-1430. Details can be found at this page: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring -- This message was sent by Atlassian JIRA (v6.3.4#6332)