[jira] [Created] (KAFKA-1737) Document required ZkSerializer for ZkClient used with AdminUtils
Stevo Slavic created KAFKA-1737: --- Summary: Document required ZkSerializer for ZkClient used with AdminUtils Key: KAFKA-1737 URL: https://issues.apache.org/jira/browse/KAFKA-1737 Project: Kafka Issue Type: Improvement Components: tools Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Priority: Minor {{ZkClient}} instances passed to {{AdminUtils}} calls must have {{kafka.utils.ZKStringSerializer}} set as {{ZkSerializer}}. Otherwise commands executed via {{AdminUtils}} may not be seen/recognizable to broker, producer or consumer. E.g. producer (with auto topic creation turned off) will not be able to send messages to a topic created via {{AdminUtils}}, it will throw {{UnknownTopicOrPartitionException}}. Please consider at least documenting this requirement in {{AdminUtils}} scaladoc. For more info see [related discussion on Kafka user mailing list|http://mail-archives.apache.org/mod_mbox/kafka-users/201410.mbox/%3CCAAUywg-oihNiXuQRYeS%3D8Z3ymsmEHo6ghLs%3Dru4nbm%2BdHVz6TA%40mail.gmail.com%3E]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188553#comment-14188553 ] Vladimir Tretyakov commented on KAFKA-1481: --- Hi Jun, thx for juicy feedback again, one question: {quote} 31. AbstractFetcherThread: 31.1 You changed the meaning of clientId. clientId is used in the fetch request and we want to leave it as just the clientId string. Since the clientId should be uniquely representing a particular consumer client, we just need to include the clientId in the metric name. We don't need to include the consumer id in either the fetch request or the metric name since it's too long and has redundant info. {quote} I didn't change meaning of clientId here, look (all code without my changes): consumerIdString string is: {code} val consumerIdString = { var consumerUuid : String = null config.consumerId match { case Some(consumerId) // for testing only = consumerUuid = consumerId case None // generate unique consumerId automatically = val uuid = UUID.randomUUID() consumerUuid = %s-%d-%s.format( InetAddress.getLocalHost.getHostName, System.currentTimeMillis, uuid.getMostSignificantBits().toHexString.substring(0,8)) } config.groupId + _ + consumerUuid } {code} thread name is (consumerIdString + fetcherId + sourceBroker.id): {code} override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { new ConsumerFetcherThread( ConsumerFetcherThread-%s-%d-%d.format(consumerIdString, fetcherId, sourceBroker.id), config, sourceBroker, partitionMap, this) } {code} clientId inside AbstractFetcherThread is: config.clientId + consumerIdString + fetcherId + sourceBroker.id {code} class ConsumerFetcherThread(name: String, val config: ConsumerConfig, sourceBroker: Broker, partitionMap: Map[TopicAndPartition, PartitionTopicInfo], val consumerFetcherManager: ConsumerFetcherManager) extends AbstractFetcherThread(name = name, clientId = config.clientId + - + name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs, socketBufferSize = config.socketReceiveBufferBytes, fetchSize = config.fetchMessageMaxBytes, fetcherBrokerId = Request.OrdinaryConsumerId, maxWait = config.fetchWaitMaxMs, minBytes = config.fetchMinBytes, isInterruptible = true) { {code} As you see there is no clean clientId inside AbstractFetcherThread class and it is not unique situation, and this is main goal why I added Taggable. Now I am trying to remove Taggable, but I have no idea what to do with such cases I've described. Can I add new 'clientId' parameter to all these classes and use only this new/clean clientId as part of matric name? Any other suggestions? Thx. Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Priority: Critical Labels: patch Fix For: 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_2014-10-24_14-14-35.patch.patch, KAFKA-1481_IDEA_IDE_2014-10-14_21-53-35.patch, KAFKA-1481_IDEA_IDE_2014-10-15_10-23-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_20-14-35.patch, KAFKA-1481_IDEA_IDE_2014-10-20_23-14-35.patch MBeans should not use dashes or underscores as separators because these characters are allowed in hostnames, topics, group and consumer IDs, etc., and these are embedded in MBeans names making it impossible to parse out individual bits from MBeans. Perhaps a pipe character should be used to avoid the conflict. This looks like a major blocker because it means nobody can write Kafka 0.8.x monitoring tools unless they are doing it for themselves AND do not use dashes AND do not use underscores. See: http://search-hadoop.com/m/4TaT4lonIW -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes
[ https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188566#comment-14188566 ] Kyle Banker commented on KAFKA-1736: Clearly, it's important to think about how a partitioning scheme affects both data loss and availability. In theory, there's an even tradeoff between the two. Let's start with data loss. If I set up a topic with a replication factor of 3, then I'm implicitly accepting the fact that 3 catastrophic node failures probably imply a loss of data. I agree with your intuition that the mathematically expected total data loss is probably the same with both partitioning strategies. Still, it should also be noted that for many use cases where durability matters, a partial loss of data is just as bad as a total loss, since a partial loss will imply an inconsistent data set. Say we have a 9-node cluster with 3-factor replication. With broker-level replication, losing the wrong 3 nodes means losing 1/3 of the data. With random-partition-level replication, losing any 3 nodes may mean losing 1/9 of the data. But again, both cases will be equally catastrophic for certain applications. Overall, I believe one should be less concerned with the data loss case because an unrecoverable crash (i.e., total disk failure) on three separate machines at the same time is exceedingly rare. Additionally, data loss can always be mitigated by increasing the replication factor and using an appropriate backup strategy. This is why I'm more concerned with availability. Clearly, network partitions, machine reboots, etc., are much more common than total disk failures. As I mentioned originally, the unfortunate and counter-intuitive problem with random-partition-level replication is that increasing the number of brokers does not increase overall availability. Once the number of partitions reaches a certain threshold, losing more than one node in the cluster renders it unavailable. I concede that all of my topics need to have the same replication factor in order to get the ideal placement. But imagine a scenario where I have two types of topics: those that require the greatest consistency that those that don't. I don't see how the latter topics, having, say, a replication factor of 1, would be any worse off in this scheme. The first topic would get the benefits of the availability provided by broker-level replication, and the second topic would be effectively randomly distributed. I like your proposal for a clumping strategy, and I believe we should pursue that further. While adding yet another configuration parameter (i.e., C in your example) isn't desirable, having the greater level of availability it will provide is. Eager and willing to help with this if other folks like, and want to refine, the idea. Improve parition-broker assignment strategy for better availaility in majority durability modes --- Key: KAFKA-1736 URL: https://issues.apache.org/jira/browse/KAFKA-1736 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Kyle Banker Priority: Minor The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas of each other, then the probability of unavailability when two nodes are lost is significantly decreased. This probability continues to decrease as the size of the cluster increases and, more significantly, this probability is constant with respect to the total number of partitions. The only requirement for getting these numbers with this strategy is that the number of brokers be a multiple of the replication factor. Here are of the results of some simulations I've run: With Random Partition Assignment Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .999 9 / 54 / 3 / .986 12 / 54 / 3 / .894 Broker-Replica-Style Partitioning Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same
[jira] [Comment Edited] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188553#comment-14188553 ] Vladimir Tretyakov edited comment on KAFKA-1481 at 10/29/14 4:49 PM: - Hi Jun, thx for juicy feedback again, one question: {quote} 31. AbstractFetcherThread: 31.1 You changed the meaning of clientId. clientId is used in the fetch request and we want to leave it as just the clientId string. Since the clientId should be uniquely representing a particular consumer client, we just need to include the clientId in the metric name. We don't need to include the consumer id in either the fetch request or the metric name since it's too long and has redundant info. {quote} I didn't change meaning of clientId here, look (all code without my changes): consumerIdString string is: {code} val consumerIdString = { var consumerUuid : String = null config.consumerId match { case Some(consumerId) // for testing only = consumerUuid = consumerId case None // generate unique consumerId automatically = val uuid = UUID.randomUUID() consumerUuid = %s-%d-%s.format( InetAddress.getLocalHost.getHostName, System.currentTimeMillis, uuid.getMostSignificantBits().toHexString.substring(0,8)) } config.groupId + _ + consumerUuid } {code} thread name is (consumerIdString + fetcherId + sourceBroker.id): {code} override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { new ConsumerFetcherThread( ConsumerFetcherThread-%s-%d-%d.format(consumerIdString, fetcherId, sourceBroker.id), config, sourceBroker, partitionMap, this) } {code} clientId inside AbstractFetcherThread is: config.clientId + consumerIdString + fetcherId + sourceBroker.id {code} class ConsumerFetcherThread(name: String, val config: ConsumerConfig, sourceBroker: Broker, partitionMap: Map[TopicAndPartition, PartitionTopicInfo], val consumerFetcherManager: ConsumerFetcherManager) extends AbstractFetcherThread(name = name, clientId = config.clientId + - + name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs, socketBufferSize = config.socketReceiveBufferBytes, fetchSize = config.fetchMessageMaxBytes, fetcherBrokerId = Request.OrdinaryConsumerId, maxWait = config.fetchWaitMaxMs, minBytes = config.fetchMinBytes, isInterruptible = true) { {code} As you see there is no clean clientId inside AbstractFetcherThread class and it is not unique situation, and this is main goal why I added Taggable. Now I am trying to remove Taggable, but I have no idea what to do with such cases I've described. Can I add new 'clientId' parameter to all these classes and use only this new/clean clientId as part of matric name? Any other suggestions? Thx. PS: there is no way t get 'clean' parameters in such classes and build map for metrics just here, I must stretching parameters during all classes as Taggable. PSS: Can we use more interactive way for communication, Jira is not a bets way for discussion/explanation, not fast at least:) was (Author: vladimir.tretyakov): Hi Jun, thx for juicy feedback again, one question: {quote} 31. AbstractFetcherThread: 31.1 You changed the meaning of clientId. clientId is used in the fetch request and we want to leave it as just the clientId string. Since the clientId should be uniquely representing a particular consumer client, we just need to include the clientId in the metric name. We don't need to include the consumer id in either the fetch request or the metric name since it's too long and has redundant info. {quote} I didn't change meaning of clientId here, look (all code without my changes): consumerIdString string is: {code} val consumerIdString = { var consumerUuid : String = null config.consumerId match { case Some(consumerId) // for testing only = consumerUuid = consumerId case None // generate unique consumerId automatically = val uuid = UUID.randomUUID() consumerUuid = %s-%d-%s.format( InetAddress.getLocalHost.getHostName, System.currentTimeMillis, uuid.getMostSignificantBits().toHexString.substring(0,8)) } config.groupId + _ + consumerUuid } {code} thread name is (consumerIdString + fetcherId + sourceBroker.id): {code} override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { new
[jira] [Commented] (KAFKA-1583) Kafka API Refactoring
[ https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188577#comment-14188577 ] Joel Koshy commented on KAFKA-1583: --- Ok cool - I will re-review the latest patch and should be able to get that checked in today. Kafka API Refactoring - Key: KAFKA-1583 URL: https://issues.apache.org/jira/browse/KAFKA-1583 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch, KAFKA-1583_2014-10-17_09:56:33.patch, KAFKA-1583_2014-10-22_18:52:52.patch, KAFKA-1583_2014-10-28_15:09:30.patch This is the next step of KAFKA-1430. Details can be found at this page: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes
[ https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188581#comment-14188581 ] Gwen Shapira commented on KAFKA-1736: - My main concern with moving to a more complex approach to replica placement is that we need to also consider rack-awareness requirements - i.e users may know about dependencies between nodes and want their replica placement to reflect that (so we can survive a reboot of entire rack with no loss of data or availability). Supporting both clumps and racks seem too complicated for users to be able to figure out where their data is... Improve parition-broker assignment strategy for better availaility in majority durability modes --- Key: KAFKA-1736 URL: https://issues.apache.org/jira/browse/KAFKA-1736 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Kyle Banker Priority: Minor The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas of each other, then the probability of unavailability when two nodes are lost is significantly decreased. This probability continues to decrease as the size of the cluster increases and, more significantly, this probability is constant with respect to the total number of partitions. The only requirement for getting these numbers with this strategy is that the number of brokers be a multiple of the replication factor. Here are of the results of some simulations I've run: With Random Partition Assignment Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .999 9 / 54 / 3 / .986 12 / 54 / 3 / .894 Broker-Replica-Style Partitioning Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .424 9 / 54 / 3 / .228 12 / 54 / 3 / .168 Adopting this strategy will greatly increase availability for users wanting majority-style durability and should not change current behavior as long as leader partitions are assigned evenly. I don't know of any negative impact for other use cases, as in these cases, the distribution will still be effectively random. Let me know if you'd like to see simulation code and whether a patch would be welcome. EDIT: Just to clarify, here's how the current partition assigner would assign 9 partitions with 3 replicas each to a 9-node cluster (broker number - set of replicas). 0 = Some(List(2, 3, 4)) 1 = Some(List(3, 4, 5)) 2 = Some(List(4, 5, 6)) 3 = Some(List(5, 6, 7)) 4 = Some(List(6, 7, 8)) 5 = Some(List(7, 8, 9)) 6 = Some(List(8, 9, 1)) 7 = Some(List(9, 1, 2)) 8 = Some(List(1, 2, 3)) Here's how I'm proposing they be assigned: 0 = Some(ArrayBuffer(8, 5, 2)) 1 = Some(ArrayBuffer(8, 5, 2)) 2 = Some(ArrayBuffer(8, 5, 2)) 3 = Some(ArrayBuffer(7, 4, 1)) 4 = Some(ArrayBuffer(7, 4, 1)) 5 = Some(ArrayBuffer(7, 4, 1)) 6 = Some(ArrayBuffer(6, 3, 0)) 7 = Some(ArrayBuffer(6, 3, 0)) 8 = Some(ArrayBuffer(6, 3, 0)) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Apache Kafka Consumers in java 8
Hi All, Can you please share your experiences of running Kafka Consumers/producers with Java 8 ?. Thanks, Balaji
[jira] [Commented] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes
[ https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188593#comment-14188593 ] Kyle Banker commented on KAFKA-1736: [~gwenshap] For users trying to mitigate against rack failures, I believe it's actually easier to reason about those failures with broker-level replication placement. For example, a 9 node cluster spread across three racks, with 3-factor replication, and broker-level replica placement, means that I can lose an entire rack and still be available. With random placement, no such guarantee can be made. Again, we're assuming the min.isr=2, replication-factor=3 case here. Improve parition-broker assignment strategy for better availaility in majority durability modes --- Key: KAFKA-1736 URL: https://issues.apache.org/jira/browse/KAFKA-1736 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Kyle Banker Priority: Minor The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas of each other, then the probability of unavailability when two nodes are lost is significantly decreased. This probability continues to decrease as the size of the cluster increases and, more significantly, this probability is constant with respect to the total number of partitions. The only requirement for getting these numbers with this strategy is that the number of brokers be a multiple of the replication factor. Here are of the results of some simulations I've run: With Random Partition Assignment Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .999 9 / 54 / 3 / .986 12 / 54 / 3 / .894 Broker-Replica-Style Partitioning Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .424 9 / 54 / 3 / .228 12 / 54 / 3 / .168 Adopting this strategy will greatly increase availability for users wanting majority-style durability and should not change current behavior as long as leader partitions are assigned evenly. I don't know of any negative impact for other use cases, as in these cases, the distribution will still be effectively random. Let me know if you'd like to see simulation code and whether a patch would be welcome. EDIT: Just to clarify, here's how the current partition assigner would assign 9 partitions with 3 replicas each to a 9-node cluster (broker number - set of replicas). 0 = Some(List(2, 3, 4)) 1 = Some(List(3, 4, 5)) 2 = Some(List(4, 5, 6)) 3 = Some(List(5, 6, 7)) 4 = Some(List(6, 7, 8)) 5 = Some(List(7, 8, 9)) 6 = Some(List(8, 9, 1)) 7 = Some(List(9, 1, 2)) 8 = Some(List(1, 2, 3)) Here's how I'm proposing they be assigned: 0 = Some(ArrayBuffer(8, 5, 2)) 1 = Some(ArrayBuffer(8, 5, 2)) 2 = Some(ArrayBuffer(8, 5, 2)) 3 = Some(ArrayBuffer(7, 4, 1)) 4 = Some(ArrayBuffer(7, 4, 1)) 5 = Some(ArrayBuffer(7, 4, 1)) 6 = Some(ArrayBuffer(6, 3, 0)) 7 = Some(ArrayBuffer(6, 3, 0)) 8 = Some(ArrayBuffer(6, 3, 0)) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes
[ https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188628#comment-14188628 ] Kyle Banker commented on KAFKA-1736: I'd like to propose one more idea: if we don't want to add another knob to Kafka, why not just modify the partition assignment algorithm so that, when the number of brokers is a multiple of the replication factor, you get the broker-level replication. When it's not, the arrangement will still be basically random, as it is now. I don't see how that wouldn't be a better default. Improve parition-broker assignment strategy for better availaility in majority durability modes --- Key: KAFKA-1736 URL: https://issues.apache.org/jira/browse/KAFKA-1736 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Kyle Banker Priority: Minor The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas of each other, then the probability of unavailability when two nodes are lost is significantly decreased. This probability continues to decrease as the size of the cluster increases and, more significantly, this probability is constant with respect to the total number of partitions. The only requirement for getting these numbers with this strategy is that the number of brokers be a multiple of the replication factor. Here are of the results of some simulations I've run: With Random Partition Assignment Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .999 9 / 54 / 3 / .986 12 / 54 / 3 / .894 Broker-Replica-Style Partitioning Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .424 9 / 54 / 3 / .228 12 / 54 / 3 / .168 Adopting this strategy will greatly increase availability for users wanting majority-style durability and should not change current behavior as long as leader partitions are assigned evenly. I don't know of any negative impact for other use cases, as in these cases, the distribution will still be effectively random. Let me know if you'd like to see simulation code and whether a patch would be welcome. EDIT: Just to clarify, here's how the current partition assigner would assign 9 partitions with 3 replicas each to a 9-node cluster (broker number - set of replicas). 0 = Some(List(2, 3, 4)) 1 = Some(List(3, 4, 5)) 2 = Some(List(4, 5, 6)) 3 = Some(List(5, 6, 7)) 4 = Some(List(6, 7, 8)) 5 = Some(List(7, 8, 9)) 6 = Some(List(8, 9, 1)) 7 = Some(List(9, 1, 2)) 8 = Some(List(1, 2, 3)) Here's how I'm proposing they be assigned: 0 = Some(ArrayBuffer(8, 5, 2)) 1 = Some(ArrayBuffer(8, 5, 2)) 2 = Some(ArrayBuffer(8, 5, 2)) 3 = Some(ArrayBuffer(7, 4, 1)) 4 = Some(ArrayBuffer(7, 4, 1)) 5 = Some(ArrayBuffer(7, 4, 1)) 6 = Some(ArrayBuffer(6, 3, 0)) 7 = Some(ArrayBuffer(6, 3, 0)) 8 = Some(ArrayBuffer(6, 3, 0)) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes
[ https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Banker updated KAFKA-1736: --- Attachment: Partitioner.scala I've attached a basic implementation, along with the original, to make it easy to compare the output. Improve parition-broker assignment strategy for better availaility in majority durability modes --- Key: KAFKA-1736 URL: https://issues.apache.org/jira/browse/KAFKA-1736 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Kyle Banker Priority: Minor Attachments: Partitioner.scala The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas of each other, then the probability of unavailability when two nodes are lost is significantly decreased. This probability continues to decrease as the size of the cluster increases and, more significantly, this probability is constant with respect to the total number of partitions. The only requirement for getting these numbers with this strategy is that the number of brokers be a multiple of the replication factor. Here are of the results of some simulations I've run: With Random Partition Assignment Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .999 9 / 54 / 3 / .986 12 / 54 / 3 / .894 Broker-Replica-Style Partitioning Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .424 9 / 54 / 3 / .228 12 / 54 / 3 / .168 Adopting this strategy will greatly increase availability for users wanting majority-style durability and should not change current behavior as long as leader partitions are assigned evenly. I don't know of any negative impact for other use cases, as in these cases, the distribution will still be effectively random. Let me know if you'd like to see simulation code and whether a patch would be welcome. EDIT: Just to clarify, here's how the current partition assigner would assign 9 partitions with 3 replicas each to a 9-node cluster (broker number - set of replicas). 0 = Some(List(2, 3, 4)) 1 = Some(List(3, 4, 5)) 2 = Some(List(4, 5, 6)) 3 = Some(List(5, 6, 7)) 4 = Some(List(6, 7, 8)) 5 = Some(List(7, 8, 9)) 6 = Some(List(8, 9, 1)) 7 = Some(List(9, 1, 2)) 8 = Some(List(1, 2, 3)) Here's how I'm proposing they be assigned: 0 = Some(ArrayBuffer(8, 5, 2)) 1 = Some(ArrayBuffer(8, 5, 2)) 2 = Some(ArrayBuffer(8, 5, 2)) 3 = Some(ArrayBuffer(7, 4, 1)) 4 = Some(ArrayBuffer(7, 4, 1)) 5 = Some(ArrayBuffer(7, 4, 1)) 6 = Some(ArrayBuffer(6, 3, 0)) 7 = Some(ArrayBuffer(6, 3, 0)) 8 = Some(ArrayBuffer(6, 3, 0)) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes
[ https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188637#comment-14188637 ] Gwen Shapira commented on KAFKA-1736: - Agree. What I meant is - we want to add rack aware replica assignment in the very near future. We need to think about how those features will play together. How about leaving the Kafka server alone for now, and implementing these policies as usability features in the partition assignment tool? This will allow users to experiment with different policies without having to change core functionality, and will allow us to figure out what makes sense for our users. Improve parition-broker assignment strategy for better availaility in majority durability modes --- Key: KAFKA-1736 URL: https://issues.apache.org/jira/browse/KAFKA-1736 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Kyle Banker Priority: Minor Attachments: Partitioner.scala The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas of each other, then the probability of unavailability when two nodes are lost is significantly decreased. This probability continues to decrease as the size of the cluster increases and, more significantly, this probability is constant with respect to the total number of partitions. The only requirement for getting these numbers with this strategy is that the number of brokers be a multiple of the replication factor. Here are of the results of some simulations I've run: With Random Partition Assignment Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .999 9 / 54 / 3 / .986 12 / 54 / 3 / .894 Broker-Replica-Style Partitioning Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .424 9 / 54 / 3 / .228 12 / 54 / 3 / .168 Adopting this strategy will greatly increase availability for users wanting majority-style durability and should not change current behavior as long as leader partitions are assigned evenly. I don't know of any negative impact for other use cases, as in these cases, the distribution will still be effectively random. Let me know if you'd like to see simulation code and whether a patch would be welcome. EDIT: Just to clarify, here's how the current partition assigner would assign 9 partitions with 3 replicas each to a 9-node cluster (broker number - set of replicas). 0 = Some(List(2, 3, 4)) 1 = Some(List(3, 4, 5)) 2 = Some(List(4, 5, 6)) 3 = Some(List(5, 6, 7)) 4 = Some(List(6, 7, 8)) 5 = Some(List(7, 8, 9)) 6 = Some(List(8, 9, 1)) 7 = Some(List(9, 1, 2)) 8 = Some(List(1, 2, 3)) Here's how I'm proposing they be assigned: 0 = Some(ArrayBuffer(8, 5, 2)) 1 = Some(ArrayBuffer(8, 5, 2)) 2 = Some(ArrayBuffer(8, 5, 2)) 3 = Some(ArrayBuffer(7, 4, 1)) 4 = Some(ArrayBuffer(7, 4, 1)) 5 = Some(ArrayBuffer(7, 4, 1)) 6 = Some(ArrayBuffer(6, 3, 0)) 7 = Some(ArrayBuffer(6, 3, 0)) 8 = Some(ArrayBuffer(6, 3, 0)) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes
[ https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188648#comment-14188648 ] Kyle Banker commented on KAFKA-1736: [~gwenshap] I that sounds pretty reasonable. But I still think it'd be useful to ponder whether there's not a better default. Improve parition-broker assignment strategy for better availaility in majority durability modes --- Key: KAFKA-1736 URL: https://issues.apache.org/jira/browse/KAFKA-1736 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Kyle Banker Priority: Minor Attachments: Partitioner.scala The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas of each other, then the probability of unavailability when two nodes are lost is significantly decreased. This probability continues to decrease as the size of the cluster increases and, more significantly, this probability is constant with respect to the total number of partitions. The only requirement for getting these numbers with this strategy is that the number of brokers be a multiple of the replication factor. Here are of the results of some simulations I've run: With Random Partition Assignment Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .999 9 / 54 / 3 / .986 12 / 54 / 3 / .894 Broker-Replica-Style Partitioning Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .424 9 / 54 / 3 / .228 12 / 54 / 3 / .168 Adopting this strategy will greatly increase availability for users wanting majority-style durability and should not change current behavior as long as leader partitions are assigned evenly. I don't know of any negative impact for other use cases, as in these cases, the distribution will still be effectively random. Let me know if you'd like to see simulation code and whether a patch would be welcome. EDIT: Just to clarify, here's how the current partition assigner would assign 9 partitions with 3 replicas each to a 9-node cluster (broker number - set of replicas). 0 = Some(List(2, 3, 4)) 1 = Some(List(3, 4, 5)) 2 = Some(List(4, 5, 6)) 3 = Some(List(5, 6, 7)) 4 = Some(List(6, 7, 8)) 5 = Some(List(7, 8, 9)) 6 = Some(List(8, 9, 1)) 7 = Some(List(9, 1, 2)) 8 = Some(List(1, 2, 3)) Here's how I'm proposing they be assigned: 0 = Some(ArrayBuffer(8, 5, 2)) 1 = Some(ArrayBuffer(8, 5, 2)) 2 = Some(ArrayBuffer(8, 5, 2)) 3 = Some(ArrayBuffer(7, 4, 1)) 4 = Some(ArrayBuffer(7, 4, 1)) 5 = Some(ArrayBuffer(7, 4, 1)) 6 = Some(ArrayBuffer(6, 3, 0)) 7 = Some(ArrayBuffer(6, 3, 0)) 8 = Some(ArrayBuffer(6, 3, 0)) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes
[ https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188660#comment-14188660 ] Gwen Shapira commented on KAFKA-1736: - I'm all pro-pondering :) It sounded like you also wanted to implement a solution (which is almost as fun as pondering) - which is why I suggested the partition reassignment tool as a useful place to implement some placement schemes. Improve parition-broker assignment strategy for better availaility in majority durability modes --- Key: KAFKA-1736 URL: https://issues.apache.org/jira/browse/KAFKA-1736 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Kyle Banker Priority: Minor Attachments: Partitioner.scala The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas of each other, then the probability of unavailability when two nodes are lost is significantly decreased. This probability continues to decrease as the size of the cluster increases and, more significantly, this probability is constant with respect to the total number of partitions. The only requirement for getting these numbers with this strategy is that the number of brokers be a multiple of the replication factor. Here are of the results of some simulations I've run: With Random Partition Assignment Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .999 9 / 54 / 3 / .986 12 / 54 / 3 / .894 Broker-Replica-Style Partitioning Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .424 9 / 54 / 3 / .228 12 / 54 / 3 / .168 Adopting this strategy will greatly increase availability for users wanting majority-style durability and should not change current behavior as long as leader partitions are assigned evenly. I don't know of any negative impact for other use cases, as in these cases, the distribution will still be effectively random. Let me know if you'd like to see simulation code and whether a patch would be welcome. EDIT: Just to clarify, here's how the current partition assigner would assign 9 partitions with 3 replicas each to a 9-node cluster (broker number - set of replicas). 0 = Some(List(2, 3, 4)) 1 = Some(List(3, 4, 5)) 2 = Some(List(4, 5, 6)) 3 = Some(List(5, 6, 7)) 4 = Some(List(6, 7, 8)) 5 = Some(List(7, 8, 9)) 6 = Some(List(8, 9, 1)) 7 = Some(List(9, 1, 2)) 8 = Some(List(1, 2, 3)) Here's how I'm proposing they be assigned: 0 = Some(ArrayBuffer(8, 5, 2)) 1 = Some(ArrayBuffer(8, 5, 2)) 2 = Some(ArrayBuffer(8, 5, 2)) 3 = Some(ArrayBuffer(7, 4, 1)) 4 = Some(ArrayBuffer(7, 4, 1)) 5 = Some(ArrayBuffer(7, 4, 1)) 6 = Some(ArrayBuffer(6, 3, 0)) 7 = Some(ArrayBuffer(6, 3, 0)) 8 = Some(ArrayBuffer(6, 3, 0)) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 26755: Patch for KAFKA-1706
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26755/ --- (Updated Oct. 29, 2014, 5:57 p.m.) Review request for kafka. Bugs: KAFKA-1706 https://issues.apache.org/jira/browse/KAFKA-1706 Repository: kafka Description (updated) --- changed arguments name correct typo. Incorporated Joel's comments. Also fixed negative queue size problem. Incorporated Joel's comments. Added unit test for ByteBoundedBlockingQueue. Fixed a bug regarding wating time. Diffs (updated) - core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/26755/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-1706) Adding a byte bounded blocking queue to util.
[ https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1706: Attachment: KAFKA-1706_2014-10-29_10:57:51.patch Adding a byte bounded blocking queue to util. - Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, KAFKA-1706_2014-10-15_09:28:01.patch, KAFKA-1706_2014-10-26_23:47:31.patch, KAFKA-1706_2014-10-26_23:50:07.patch, KAFKA-1706_2014-10-27_18:34:37.patch, KAFKA-1706_2014-10-29_10:57:51.patch We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1706) Adding a byte bounded blocking queue to util.
[ https://issues.apache.org/jira/browse/KAFKA-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188685#comment-14188685 ] Jiangjie Qin commented on KAFKA-1706: - Updated reviewboard https://reviews.apache.org/r/26755/diff/ against branch origin/trunk Adding a byte bounded blocking queue to util. - Key: KAFKA-1706 URL: https://issues.apache.org/jira/browse/KAFKA-1706 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1706.patch, KAFKA-1706_2014-10-15_09:26:26.patch, KAFKA-1706_2014-10-15_09:28:01.patch, KAFKA-1706_2014-10-26_23:47:31.patch, KAFKA-1706_2014-10-26_23:50:07.patch, KAFKA-1706_2014-10-27_18:34:37.patch, KAFKA-1706_2014-10-29_10:57:51.patch We saw many out of memory issues in Mirror Maker. To enhance memory management we want to introduce a ByteBoundedBlockingQueue that has limit on both number of messages and number of bytes in it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1735) MemoryRecords.Iterator needs to handle partial reads from compressed stream
[ https://issues.apache.org/jira/browse/KAFKA-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188799#comment-14188799 ] Guozhang Wang commented on KAFKA-1735: -- Updated reviewboard https://reviews.apache.org/r/27256/diff/ against branch origin/trunk MemoryRecords.Iterator needs to handle partial reads from compressed stream --- Key: KAFKA-1735 URL: https://issues.apache.org/jira/browse/KAFKA-1735 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.9.0 Attachments: KAFKA-1735.patch Found a bug in the MemoryRecords.Iterator implementation, where {code} stream.read(recordBuffer, 0, size) {code} can read less than size'ed bytes, and rest of the recordBuffer would set to \0. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
RE: Apache Kafka Consumers in java 8
Have anybody used Kafka with Java 8 ?. -Original Message- From: Seshadri, Balaji [mailto:balaji.sesha...@dish.com] Sent: Wednesday, October 29, 2014 11:11 AM To: 'dev@kafka.apache.org'; 'us...@kafka.apache.org' Subject: Apache Kafka Consumers in java 8 Hi All, Can you please share your experiences of running Kafka Consumers/producers with Java 8 ?. Thanks, Balaji
Compile failure going from kafka 0.8.1.1 to 0.8.2
My Scala project built against kafka 0.8.1.1 commits consumer offsets as follows: connector.commitOffsets This compiles without warnings. When I bumped the library dependency to 0.8.2-beta, the compiler started emitting this error: [error] src/main/scala/com/whitepages/kafka/consumer/Connector.scala:21: missing arguments for method commitOffsets in trait ConsumerConnector; [error] follow this method with `_' if you want to treat it as a partially applied function [error] connector.commitOffsets [error] ^ [error] one error found [error] (compile:compile) Compilation failed The following change resolved the error: -connector.commitOffsets +connector.commitOffsets() Should we expect compilation-breaking changes moving from 0.8.1.1 to 0.8.2-beta? -- Jack Foy j...@whitepages.com
[jira] [Commented] (KAFKA-1737) Document required ZkSerializer for ZkClient used with AdminUtils
[ https://issues.apache.org/jira/browse/KAFKA-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14188902#comment-14188902 ] Neha Narkhede commented on KAFKA-1737: -- Since AdminUtils is intended to be user facing, at least until we have a admin APIs in Kafka, a more usable fix is to set it to the right ZK serializer instead of relying on the user to do it. This should be a fairly trivial patch. [~sslavic], would you like to submit a patch? Document required ZkSerializer for ZkClient used with AdminUtils Key: KAFKA-1737 URL: https://issues.apache.org/jira/browse/KAFKA-1737 Project: Kafka Issue Type: Improvement Components: tools Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Priority: Minor {{ZkClient}} instances passed to {{AdminUtils}} calls must have {{kafka.utils.ZKStringSerializer}} set as {{ZkSerializer}}. Otherwise commands executed via {{AdminUtils}} may not be seen/recognizable to broker, producer or consumer. E.g. producer (with auto topic creation turned off) will not be able to send messages to a topic created via {{AdminUtils}}, it will throw {{UnknownTopicOrPartitionException}}. Please consider at least documenting this requirement in {{AdminUtils}} scaladoc. For more info see [related discussion on Kafka user mailing list|http://mail-archives.apache.org/mod_mbox/kafka-users/201410.mbox/%3CCAAUywg-oihNiXuQRYeS%3D8Z3ymsmEHo6ghLs%3Dru4nbm%2BdHVz6TA%40mail.gmail.com%3E]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Compile failure going from kafka 0.8.1.1 to 0.8.2
Which version of Scala did you use for Kafka 0.8.2-beta? On Wed, Oct 29, 2014 at 12:40 PM, Jack Foy j...@whitepages.com wrote: My Scala project built against kafka 0.8.1.1 commits consumer offsets as follows: connector.commitOffsets This compiles without warnings. When I bumped the library dependency to 0.8.2-beta, the compiler started emitting this error: [error] src/main/scala/com/whitepages/kafka/consumer/Connector.scala:21: missing arguments for method commitOffsets in trait ConsumerConnector; [error] follow this method with `_' if you want to treat it as a partially applied function [error] connector.commitOffsets [error] ^ [error] one error found [error] (compile:compile) Compilation failed The following change resolved the error: -connector.commitOffsets +connector.commitOffsets() Should we expect compilation-breaking changes moving from 0.8.1.1 to 0.8.2-beta? -- Jack Foy j...@whitepages.com
[jira] [Updated] (KAFKA-1501) transient unit tests failures due to port already in use
[ https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-1501: - Attachment: KAFKA-1501-choosePorts.patch Did anyone actually verify that a port is getting into TIME_WAIT or was that just a hunch? It actually seems unlikely since the socket was returned by choosePorts and there's no threading that would allow the socket to still be bound. And if it was a socket previously used for accept(), the only way it should end up in TIME_WAIT is if there was an outstanding connection request that hadn't been handled when the socket was closed. I think a much simpler explanation is that a port is being allocated twice within each test. I suspect you're seeing these errors on ZooKeeperTestHarness tests because it uses a single port that is allocated in the TestZKUtils object -- that port is used for *all* tests. This means that there are plenty of times when that port is not bound (before a test has started) and choosePort() or choosePorts() is called (during test class instantiation), which could then return that same port and cause a conflict. Unfortunately, I am not able to reproduce this issue so I can't verify that. If someone else wants to try to verify, just logging the values returned by choosePort and the value of TestZKUtils.zookeeperConnect would make this issue easy to track down in a log. What we really need is to make sure that tests use a single call to choosePorts() to allocate *all* the ports they'll need. The attached patch should do this. It's obviously possible to call choosePorts() twice, but I've tried to discourage it. The choosePort() variant is removed and a warning is added to the choosePorts() documentation. It uses a new base class, NetworkTestHarness, for all tests that need to coordinate multiple ports (i.e., anything that uses ZookeeperTestHarness since at that point both ZookeeperTestHarness and the test class will probably need to call choosePorts()). Because of the way KafkaServerTestHarness works, I made them all get allocated at initialization (so configs for KafkaServerTestHarness can still be generated at test class instantiation). You have to know how many to allocate up front, but by default it allocates 5 so that all the current tests don't need to override anything. [~copester] - can you test out this patch since you can reliably reproduce the issue? And can you give an idea of the type of hardware you're able to reproduce it on since you mentioned it seems common on beefier hardware? transient unit tests failures due to port already in use Key: KAFKA-1501 URL: https://issues.apache.org/jira/browse/KAFKA-1501 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Assignee: Guozhang Wang Labels: newbie Attachments: KAFKA-1501-choosePorts.patch, KAFKA-1501.patch, KAFKA-1501.patch Saw the following transient failures. kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne FAILED kafka.common.KafkaException: Socket server failed to bind to localhost:59909: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195) at kafka.network.Acceptor.init(SocketServer.scala:141) at kafka.network.SocketServer.startup(SocketServer.scala:68) at kafka.server.KafkaServer.startup(KafkaServer.scala:95) at kafka.utils.TestUtils$.createServer(TestUtils.scala:123) at kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes
[ https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14189041#comment-14189041 ] Neha Narkhede commented on KAFKA-1736: -- [~kbanker] Your idea of introducing some sort of clumping is interesting and, as [~gwenshap] suggested, if the clumps were racks, it might work really well. SInce, now, with a failed rack, you would prevent loss of availability since 2 replicas of a partition wouldn't share the same clump or rack. I also think though, that we might have to experiment with varying clump sizes, instead of just 3, just so we end up supporting topics with varying replication factors easily. And I wouldn't be opposed to allowing the capability to add this replica assignment schemes as an option. Though before doing that, one thing to consider is the impact on leader placement since the current strategy, in addition to just assigning replicas to brokers, also allows for evenly spreading the leaders on the brokers in the cluster. It staggers the partition replicas on brokers such that if leaders live on the preferred replica, you end up with an even leader placement that in turn helps load balancing. If we were to add pluggable replica placement strategies, we might have to expose it in the topics tool in addition to the reassignment tool. FYI - We have patches for rack aware replica placement (KAFKA-1215, KAFKA-1357) Improve parition-broker assignment strategy for better availaility in majority durability modes --- Key: KAFKA-1736 URL: https://issues.apache.org/jira/browse/KAFKA-1736 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Kyle Banker Priority: Minor Attachments: Partitioner.scala The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas of each other, then the probability of unavailability when two nodes are lost is significantly decreased. This probability continues to decrease as the size of the cluster increases and, more significantly, this probability is constant with respect to the total number of partitions. The only requirement for getting these numbers with this strategy is that the number of brokers be a multiple of the replication factor. Here are of the results of some simulations I've run: With Random Partition Assignment Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .999 9 / 54 / 3 / .986 12 / 54 / 3 / .894 Broker-Replica-Style Partitioning Number of Brokers / Number of Partitions / Replication Factor / Probability that two randomly selected nodes will contain at least 1 of the same partitions 6 / 54 / 3 / .424 9 / 54 / 3 / .228 12 / 54 / 3 / .168 Adopting this strategy will greatly increase availability for users wanting majority-style durability and should not change current behavior as long as leader partitions are assigned evenly. I don't know of any negative impact for other use cases, as in these cases, the distribution will still be effectively random. Let me know if you'd like to see simulation code and whether a patch would be welcome. EDIT: Just to clarify, here's how the current partition assigner would assign 9 partitions with 3 replicas each to a 9-node cluster (broker number - set of replicas). 0 = Some(List(2, 3, 4)) 1 = Some(List(3, 4, 5)) 2 = Some(List(4, 5, 6)) 3 = Some(List(5, 6, 7)) 4 = Some(List(6, 7, 8)) 5 = Some(List(7, 8, 9)) 6 = Some(List(8, 9, 1)) 7 = Some(List(9, 1, 2)) 8 = Some(List(1, 2, 3)) Here's how I'm proposing they be assigned: 0 = Some(ArrayBuffer(8, 5, 2)) 1 = Some(ArrayBuffer(8, 5, 2)) 2 = Some(ArrayBuffer(8, 5, 2)) 3 = Some(ArrayBuffer(7, 4, 1)) 4 = Some(ArrayBuffer(7, 4, 1)) 5 = Some(ArrayBuffer(7, 4, 1)) 6 = Some(ArrayBuffer(6, 3, 0)) 7 = Some(ArrayBuffer(6, 3, 0)) 8 = Some(ArrayBuffer(6, 3, 0)) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Compile failure going from kafka 0.8.1.1 to 0.8.2
On Oct 29, 2014, at 1:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Which version of Scala did you use for Kafka 0.8.2-beta? The problem reproduces under both Scala 2.10.4 and Scala 2.11.2. -- Jack Foy j...@whitepages.com
Re: Compile failure going from kafka 0.8.1.1 to 0.8.2
On Oct 29, 2014, at 3:03 PM, Jack Foy j...@whitepages.com wrote: On Oct 29, 2014, at 1:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Which version of Scala did you use for Kafka 0.8.2-beta? The problem reproduces under both Scala 2.10.4 and Scala 2.11.2. Sorry, to be clearer: I am using SBT with the following dependency line, to pull in the specific package for the version of Scala I’m running. libraryDependencies ++= Seq( org.apache.kafka %% kafka % 0.8.2-beta exclude(javax.jms, jms) exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri) exclude(log4j, log4j) ) I tried the following combinations: Scala 2.10.4, kafka 0.8.1.1, my old code - compiled Scala 2.10.4, kafka 0.8.2, my old code - failed Scala 2.11.2, kafka 0.8.2, my old code - failed Scala 2.10.4, kafka 0.8.2, my new code - compiled Scala 2.11.2, kafka 0.8.2, my new code - compiled -- Jack Foy j...@whitepages.com
[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation
[ https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14189231#comment-14189231 ] Guozhang Wang commented on KAFKA-1634: -- While working on this: should a retention time better be scaled at secs than ms? Do we have scenarios where people want to just retain their offsets for miliseconds? If not then we may end up always having very large number for this field. Improve semantics of timestamp in OffsetCommitRequests and update documentation --- Key: KAFKA-1634 URL: https://issues.apache.org/jira/browse/KAFKA-1634 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Guozhang Wang Priority: Blocker Fix For: 0.8.3 From the mailing list - following up on this -- I think the online API docs for OffsetCommitRequest still incorrectly refer to client-side timestamps: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Wasn't that removed and now always handled server-side now? Would one of the devs mind updating the API spec wiki? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Compile failure going from kafka 0.8.1.1 to 0.8.2
Scala 2.10.4, kafka 0.8.1.1, my old code - compiled Scala 2.10.4, kafka 0.8.2, my old code - failed If that's the case, then we probably need to fix the issue. Can you please file a JIRA? On Wed, Oct 29, 2014 at 3:15 PM, Jack Foy j...@whitepages.com wrote: On Oct 29, 2014, at 3:03 PM, Jack Foy j...@whitepages.com wrote: On Oct 29, 2014, at 1:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Which version of Scala did you use for Kafka 0.8.2-beta? The problem reproduces under both Scala 2.10.4 and Scala 2.11.2. Sorry, to be clearer: I am using SBT with the following dependency line, to pull in the specific package for the version of Scala I’m running. libraryDependencies ++= Seq( org.apache.kafka %% kafka % 0.8.2-beta exclude(javax.jms, jms) exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri) exclude(log4j, log4j) ) I tried the following combinations: Scala 2.10.4, kafka 0.8.1.1, my old code - compiled Scala 2.10.4, kafka 0.8.2, my old code - failed Scala 2.11.2, kafka 0.8.2, my old code - failed Scala 2.10.4, kafka 0.8.2, my new code - compiled Scala 2.11.2, kafka 0.8.2, my new code - compiled -- Jack Foy j...@whitepages.com
[jira] [Commented] (KAFKA-1736) Improve parition-broker assignment strategy for better availaility in majority durability modes
[ https://issues.apache.org/jira/browse/KAFKA-1736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14189451#comment-14189451 ] Jay Kreps commented on KAFKA-1736: -- [~kbanker]: 1. I do think this is a real problem and it would be nice to address there are some complexities though. 2. Another aspect I didn't mention before is load balancing in the case of failure. The leader does a bit more work than the slaves as it serves all the consumer reads as well as the reads for slaves. Reads are cheaper than writes but not free. In the current setup if a server fails leadership is randomly dispersed and should spread fairly evenly across the cluster. So in a large cluster each server will add a very small percentage of additional load due to additional leadership responsibility. However in the C=3 case you described when a server fails 50% of its leadership load will fall to each of the other two machines in that clump. This will be a nasty surprise as you may well then run out of capacity in the rest of the clump. This argues for a largish clump size. 3. I agree with Gwen that asking people to specify C will likely be a bit confusing--99% of people will just take the default so really making the default good is the important thing. I was actually thinking we would just choose C for you in some reasonable way (e.g. just set it to 5 or 10, we should be able to analyze this problem far better than most people will). I was actually just using the clumping thing to show that you don't necessarily need exact replicas to get the benefit. 4. I chatted with Neha and we could make the strategy pluggable, and this kind of reduces risk since we can then default to the current approach, but it also reduces benefit because ~0% of people will chose anything but the default. I do think this is a real problem so I think if we are going to make a change here we should make it in the default case (or ideally just have one case). I actually don't think there is a real use case for choosing the current strategy if you had a well tested version of the other approach. This does mean the bar for testing will be higher if you undertake this. 5. So including the rack awareness the algorithm will be a bit complex as we will need to first choose clumps in a way that maximally spans racks (which I think is the point of the rack aware patch). Then you will assign a partition by first choosing a clump and then choosing a starting position within this clump. The ordering of the replica list is important as Neha points out as we use the first entry as the preferred replica and attempt to keep leadership there when that node is available. You don't want to end up in a scenario where you have a clump of nodes (1,2,3) sharing all the same partitions but all leadership is on node. One approach would be to randomize, however what the current code does is actually a little better by explicitly going round-robin in it's assignment so it is slightly more fair than random. That would place one partition. You would then need to round-robin over clumps to place the additional partitions. Spreading a topic over multiple clumps is important because what we have observed is that in practice you get some power law distribution in topic size, and you really don't want to place a bunch of partitions from a really high volume topic all on the same machine (you want to spread them as widely as possible). 6. [~gwenshap] this would totally just be a change in the replica placement algorithm nothing major in Kafka (unlike the rack placement patch I don't think we need any more metadata for this), so although the code is tricky it should be very contained I think. Improve parition-broker assignment strategy for better availaility in majority durability modes --- Key: KAFKA-1736 URL: https://issues.apache.org/jira/browse/KAFKA-1736 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Kyle Banker Priority: Minor Attachments: Partitioner.scala The current random strategy of partition-to-broker distribution combined with a fairly typical use of min.isr and request.acks results in a suboptimal level of availability. Specifically, if all of your topics have a replication factor of 3, and you use min.isr=2 and required.acks=all, then regardless of the number of the brokers in the cluster, you can safely lose only 1 node. Losing more than 1 node will, 95% of the time, result in the inability to write to at least one partition, thus rendering the cluster unavailable. As the total number of partitions increases, so does this probability. On the other hand, if partitions are distributed so that brokers are effectively replicas
[jira] [Commented] (KAFKA-1501) transient unit tests failures due to port already in use
[ https://issues.apache.org/jira/browse/KAFKA-1501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14189472#comment-14189472 ] Jay Kreps commented on KAFKA-1501: -- Interesting. There is an inherent race condition in between choosePorts picking an open port and then the test actually using it. We can't perfectly solve this issue (i.e. since our apache tests run on a shared unit test machine even if we are perfect it is possible we could get broken by other tests in another process). However choosePorts is particularly prone to the problem Ewan described because it will always return the same port until someone actually uses it. Another approach would be to chose the port randomly in some range, check that you can bind to it, and then release it and give it out. This would make this kind of accident less likely as you would have to both hit the race condition and collide on the same port. transient unit tests failures due to port already in use Key: KAFKA-1501 URL: https://issues.apache.org/jira/browse/KAFKA-1501 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Assignee: Guozhang Wang Labels: newbie Attachments: KAFKA-1501-choosePorts.patch, KAFKA-1501.patch, KAFKA-1501.patch Saw the following transient failures. kafka.api.ProducerFailureHandlingTest testTooLargeRecordWithAckOne FAILED kafka.common.KafkaException: Socket server failed to bind to localhost:59909: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:195) at kafka.network.Acceptor.init(SocketServer.scala:141) at kafka.network.SocketServer.startup(SocketServer.scala:68) at kafka.server.KafkaServer.startup(KafkaServer.scala:95) at kafka.utils.TestUtils$.createServer(TestUtils.scala:123) at kafka.api.ProducerFailureHandlingTest.setUp(ProducerFailureHandlingTest.scala:68) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review59109 --- Ship it! Ship It! - Joel Koshy On Oct. 28, 2014, 10:09 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 28, 2014, 10:09 p.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Incoporated Joel's comments round two Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
[jira] [Updated] (KAFKA-1583) Kafka API Refactoring
[ https://issues.apache.org/jira/browse/KAFKA-1583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-1583: -- Resolution: Fixed Fix Version/s: (was: 0.9.0) 0.8.3 Status: Resolved (was: Patch Available) Thanks for the patch and your patience! Just checked in to trunk. Kafka API Refactoring - Key: KAFKA-1583 URL: https://issues.apache.org/jira/browse/KAFKA-1583 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8.3 Attachments: KAFKA-1583.patch, KAFKA-1583_2014-08-20_13:54:38.patch, KAFKA-1583_2014-08-21_11:30:34.patch, KAFKA-1583_2014-08-27_09:44:50.patch, KAFKA-1583_2014-09-01_18:07:42.patch, KAFKA-1583_2014-09-02_13:37:47.patch, KAFKA-1583_2014-09-05_14:08:36.patch, KAFKA-1583_2014-09-05_14:55:38.patch, KAFKA-1583_2014-10-13_19:41:58.patch, KAFKA-1583_2014-10-16_21:15:40.patch, KAFKA-1583_2014-10-17_09:56:33.patch, KAFKA-1583_2014-10-22_18:52:52.patch, KAFKA-1583_2014-10-28_15:09:30.patch This is the next step of KAFKA-1430. Details can be found at this page: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+API+Refactoring -- This message was sent by Atlassian JIRA (v6.3.4#6332)