[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13556380#comment-13556380 ] Jun Rao commented on KAFKA-691: --- Actually, the current code works since partitionId is always btw 0 and num.partition-1 and therefore it happens to also be the index of the partitionList. This patch just makes the code a bit better to understand. Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps Assignee: Maxime Brugidou Fix For: 0.8 Attachments: kafka-691_extra.patch, KAFKA-691-v1.patch, KAFKA-691-v2.patch In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13552336#comment-13552336 ] Jun Rao commented on KAFKA-691: --- It would be great if you can provide a patch. 1,2,3. Yes, we will need a new config. We should do batching in updateinfo(). This does make the producer side logic a bit more complicated. We have been thinking about making getMetadata faster. When we get there, we can revisit the batching logic. Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps Assignee: Maxime Brugidou Fix For: 0.8 Attachments: KAFKA-691-v1.patch, KAFKA-691-v2.patch In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13552363#comment-13552363 ] Jay Kreps commented on KAFKA-691: - Does batching make sense versus just having people increase the timeout? Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps Assignee: Maxime Brugidou Fix For: 0.8 Attachments: KAFKA-691-v1.patch, KAFKA-691-v2.patch In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549842#comment-13549842 ] Jun Rao commented on KAFKA-691: --- Thanks for the patch. Overall, the patch is pretty good and is well thought out. Some comments: 1. DefaultEventHandler: 1.1 In handle(), I don't think we need to add the if test in the following statement. The reason is that a message could fail to be sent because the leader changes immediately after the previous metadata refresh. Normally, leaders are elected very quickly. So, it makes sense to refresh the metadata again. if (topicMetadataToRefresh.nonEmpty) Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet)) 1.2 In handle(), it seems that it's better to call the following code before dispatchSerializedData(). if (topicMetadataRefreshInterval = 0 SystemTime.milliseconds - lastTopicMetadataRefresh topicMetadataRefreshInterval) { Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet)) topicMetadataToRefresh.clear lastTopicMetadataRefresh = SystemTime.milliseconds } 1.3 getPartition(): If none of the partitions is available, we should throw LeaderNotAvailableException, instead of UnknownTopicOrPartitionException. 2. DefaultPartitioner: Since key is not expected to be null, we should remove the code that deals with null key. 3. The consumer side logic is fine. The consumer rebalance is only triggered when there are changes in partitions, not when there are changes in the availability of the partition. The rebalance logic doesn't depend on a partition being available. If a partition is not available, ConsumerFetcherManager will keep refreshing metadata. If you have a replication factor of 1, you will need to set a larger refresh.leader.backoff.ms, if a broker is expected to go down for a long time. Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps Attachments: KAFKA-691-v1.patch In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548667#comment-13548667 ] Maxime Brugidou commented on KAFKA-691: --- I think the work-around is not really acceptable for me since it will consume 3x the resources (because replication of 3 is the minimum acceptable) and it will still make the cluster less available anyway (unless i have only 3 brokers). The thing is that 0.7 was making the cluster 100% available (for my use case, accepting data loss) as long a single broker was alive. A way to handle this would be to: 1. Have a lot of partitions per topic (more than the # of brokers) 2. Have something that rebalances the partitions and make sure a broker has a at least a partition for each topic (to make every topic available) 3. Have a setting in the consumer/producer that say I don't care about partitioning, just produce/consume wherever you can Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549105#comment-13549105 ] Maxime Brugidou commented on KAFKA-691: --- I agree with Jun solution, this would solve 3 (1 and 2 can be done manualy already -- just send a ReassignPartition command when you add a broker) I could probably implement this very quickly, I'm just not sure of how you get the availability of a partition, but i'll try to figure it out and submit a first patch tomorrow. Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549276#comment-13549276 ] Jay Kreps commented on KAFKA-691: - That would be awesome. If you don't mind just give the proposed set of changes on the JIRA first and lets get everyone on board with how it should work since it is a reasonably important change (or, if you don't mind revising your patch we can start with that). Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549373#comment-13549373 ] Jun Rao commented on KAFKA-691: --- DefaultEventHander.getPartitionListForTopic() returns Seq[PartitionAndLeader]. If PartitionAndLeader.leaderBrokerIdOpt is none, the partition is not available. There is another tricky issue. If a partition is not available, when do we refresh the metadata to check if the partition becomes available again? Currently, we refresh the metadata if we fail to send the data. However, if we always route the messages to available partitions, we may never fail to send. One possible solution is that if there is at least one partition not available in Seq[PartitionAndLeader], we refresh the metadata if a configurable amount of time has passed (e.g., 10 mins). Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira