[ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Joel Koshy updated KAFKA-353: ----------------------------- Attachment: kafka-353_v1.patch Patch is not too big, but a bit tricky. To help with review, here is an overview: - DelayedProduce: - Contains the logic to determine if it can be unblocked or not. The partitionStatus map is necessary to keep track of the local log's offset, error status, and whether acks are still pending. The comments in the code should make the remaining logic clear. - Handling delayed producer requests brings in dependencies on the fetch request purgatory, replica manager and KafkaZookeeper - which is why I had to pass in kafkaApis to the DelayedProduce which is a bit ugly. - KafkaApis: - The existing code for handling produce requests would respond when the leader persists the data to disk. We still do that if requiredAcks is 0 or 1. For other values, we now create a DelayedProduce request which will be satisfied when requiredAcks followers are caught up. If the |ISR| < requiredAcks then the request will time out. - handleFetchRequest: if a request comes from a follower replica, check the produceRequestPurgatory and see if DelayedProduce requests can be unblocked. - ReplicaManager - only change is to try and unblock DelayedProduce requests to partitions for which the leader changed. Note that even if a request times out, some of the partitions may have been successfully acked - so even if one partition times out the global error code is NoError. The receiver must check the errors array to determine if there are any failures. I think this brings in the need for a "PartialSuccess" global error code in the ProducerResponse. Thoughts on this? I think there was a bug in checking satisfied DelayedFetchRequests: the checkSatisfied method would take a produceRequest's TopicData and see if the total number of bytes in that produce request could satisfy the remaining bytes to be filled in the DelayedFetchRequest. However, that could count data for partitions that were not part of the DelayedFetchRequest. This patch fixes that issue as well - changed the FetchRequestPurgatory key from TopicData to PartitionData and check on a per-partition basis. Another potential issue is that the DelayedFetchRequest satisfied counts using MessageSet's sizeInBytes, which could include incomplete messages - as opposed to iterating over the valid messages and getting the size. I left that as is. I think it is debatable which approach is correct in this case. I added some trace logs at individual request level - e.g., "Produce request to topic unblocked n delayedfetchrequests". These would be more useful if we add a uuid to each produce request - I think this idea was tossed around on the mailing list sometime before. Doesn't even have to be uuid or part of the produceRequest's wire format - even an atomicLong counter (internal to the broker) may be helpful- thoughts? There is this corner case that I think is handled correctly but want to make sure: - leader receives a producer request and adds it to the ProduceRequestPurgatory. - leadership changes while it is pending, so the error code for that partition is set to NotLeaderErrorForPartitionCode - leadership changes back to this broker while the DelayedProduce is pending. - In this scenario, the partition remains in the error state. - I think it is correct because the leader would have become a follower (before it became a leader again), and would have truncated its log to the intermediate leader's HW. If requiredAcks == |ISR| and the |ISR| shrinks while the DelayedProduce is pending, the request may timeout. However, if the |ISR| expands back to its original size while it is still pending it will get satisfied. Let me know if you can think of other corner cases that need to be considered - I wouldn't be surprised if there are quite a few. I only did limited testing with the ProducerTest. I think this opens up the following future work (separate jiras): - Enhance system test to test all corner cases (leader changing while request pending, ISR shrinking while request pending, etc. - ProducerResponse currently uses two separate arrays for errors and offsets; and ProducerRequest uses as array of TopicData each of which contains an array of PartitionData. It may be a good idea to improve these classes to use maps/something else as I had to resort to using find and indexOf to locate partition-level data in the original request. - We should add some mbeans for request purgatory stats - avg. hold time, outstanding requests, etc. - We should try and get rid of sleeps and fix all intermittent test failures. If the above list sounds good I'll file the jiras. > tie producer-side ack with high watermark and progress of replicas > ------------------------------------------------------------------ > > Key: KAFKA-353 > URL: https://issues.apache.org/jira/browse/KAFKA-353 > Project: Kafka > Issue Type: Sub-task > Affects Versions: 0.8 > Reporter: Jun Rao > Assignee: Joel Koshy > Attachments: kafka-353_v1.patch > > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira