[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-17 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13556380#comment-13556380
 ] 

Jun Rao commented on KAFKA-691:
---

Actually, the current code works since partitionId is always btw 0 and 
num.partition-1 and therefore it happens to also be the index of the 
partitionList. This patch just makes the code a bit better to understand.

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps
Assignee: Maxime Brugidou
 Fix For: 0.8

 Attachments: kafka-691_extra.patch, KAFKA-691-v1.patch, 
 KAFKA-691-v2.patch


 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-13 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13552336#comment-13552336
 ] 

Jun Rao commented on KAFKA-691:
---

It would be great if you can provide a patch.

1,2,3. Yes, we will need a new config. We should do batching in updateinfo(). 
This does make the producer side logic a bit more complicated. We have been 
thinking about making getMetadata faster. When we get there, we can revisit the 
batching logic.

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps
Assignee: Maxime Brugidou
 Fix For: 0.8

 Attachments: KAFKA-691-v1.patch, KAFKA-691-v2.patch


 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-13 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13552363#comment-13552363
 ] 

Jay Kreps commented on KAFKA-691:
-

Does batching make sense versus just having people increase the timeout?

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps
Assignee: Maxime Brugidou
 Fix For: 0.8

 Attachments: KAFKA-691-v1.patch, KAFKA-691-v2.patch


 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-10 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549842#comment-13549842
 ] 

Jun Rao commented on KAFKA-691:
---

Thanks for the patch. Overall, the patch is pretty good and is well thought 
out. Some comments:

1. DefaultEventHandler:
1.1 In handle(), I don't think we need to add the if test in the following 
statement. The reason is that a message could fail to be sent because the 
leader changes immediately after the previous metadata refresh. Normally, 
leaders are elected very quickly. So, it makes sense to refresh the metadata 
again.
  if (topicMetadataToRefresh.nonEmpty)
  
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet))
1.2 In handle(), it seems that it's better to call the following code before 
dispatchSerializedData().
if (topicMetadataRefreshInterval = 0 
SystemTime.milliseconds - lastTopicMetadataRefresh  
topicMetadataRefreshInterval) {
  
Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet))
  topicMetadataToRefresh.clear
  lastTopicMetadataRefresh = SystemTime.milliseconds
}
1.3 getPartition(): If none of the partitions is available, we should throw 
LeaderNotAvailableException, instead of UnknownTopicOrPartitionException.

2. DefaultPartitioner: Since key is not expected to be null, we should remove 
the code that deals with null key. 

3. The consumer side logic is fine. The consumer rebalance is only triggered 
when there are changes in partitions, not when there are changes in the 
availability of the partition. The rebalance logic doesn't depend on a 
partition being available. If a partition is not available, 
ConsumerFetcherManager will keep refreshing metadata. If you have a replication 
factor of 1, you will need to set a larger refresh.leader.backoff.ms, if a 
broker is expected to go down for a long time. 

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps
 Attachments: KAFKA-691-v1.patch


 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Maxime Brugidou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548667#comment-13548667
 ] 

Maxime Brugidou commented on KAFKA-691:
---

I think the work-around is not really acceptable for me since it will consume 
3x the resources (because replication of 3 is the minimum acceptable) and it 
will still make the cluster less available anyway (unless i have only 3 
brokers).

The thing is that 0.7 was making the cluster 100% available (for my use case, 
accepting data loss) as long a single broker was alive.

A way to handle this would be to:
1. Have a lot of partitions per topic (more than the # of brokers)
2. Have something that rebalances the partitions and make sure a broker has a 
at least a partition for each topic (to make every topic available)
3. Have a setting in the consumer/producer that say I don't care about 
partitioning, just produce/consume wherever you can

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps

 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Maxime Brugidou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549105#comment-13549105
 ] 

Maxime Brugidou commented on KAFKA-691:
---

I agree with Jun solution, this would solve 3 (1 and 2 can be done manualy 
already -- just send a ReassignPartition command when you add a broker)

I could probably implement this very quickly, I'm just not sure of how you get 
the availability of a partition, but i'll try to figure it out and submit a 
first patch tomorrow.

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps

 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549276#comment-13549276
 ] 

Jay Kreps commented on KAFKA-691:
-

That would be awesome. If you don't mind just give the proposed set of changes 
on the JIRA first and lets get everyone on board with how it should work since 
it is a reasonably important change (or, if you don't mind revising your patch 
we can start with that).

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps

 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-09 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549373#comment-13549373
 ] 

Jun Rao commented on KAFKA-691:
---

DefaultEventHander.getPartitionListForTopic() returns Seq[PartitionAndLeader]. 
If PartitionAndLeader.leaderBrokerIdOpt is none, the partition is not 
available. 

There is another tricky issue. If a partition is not available, when do we 
refresh the metadata to check if the partition becomes available again? 
Currently, we refresh the metadata if we fail to send the data. However, if we 
always route the messages to available partitions, we may never fail to send. 
One possible solution is that if there is at least one partition not available 
in Seq[PartitionAndLeader], we refresh the metadata if a configurable amount of 
time has passed (e.g., 10 mins).

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps

 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira