[ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13400595#comment-13400595 ]
Jay Kreps commented on KAFKA-353: --------------------------------- This doesn't seem to apply cleanly on 0.8, I get conflicts on ReplicaFetcherThread and ReplicaManager. A few comments: 1. There is a race condition between adding to the purgatory and acknowledgement, I think. We first produce locally and then add the watcher, which allows a window where the message is available for replication with no watcher on ack. If a replica acknowledges a given produce before the DelayedProduce is added as a watcher that will not be recorded, I think, and the request will potentially timeout waiting for ISR acks. This would certainly happen and lead to sporadic timeouts. This is tricky, let's discuss. One solution would be to reverse the logic--add the watcher first then produce it locally, but this means having to handle the case where the local produce fails (we don't want to leak watchers). 2. As you say, you definitely can't pass KafkaApis to anything, making DelayedProduceReuqest an inner class of KafkaApis will fix this, I think. 3. I think the way we are tying the api handling and replica manager is not good. replica manager shouldn't know about DelayedProduce requests if possible, it should only know about replica management and be oblivous to the api level. 4. Why is the ack timeout in seconds? It should be milliseconds like everything else, right? One would commonly want a sub-second timeout. 5. As a stylistic thing it would be good to avoid direct use of Tuple2[A,B] and instead use (A,B) (e.g. myMethod: List[(A,B)] instead of myMethod: List[Tuple2[A,B]] 6. I don't understand why we are passing (topic, partition) to the purgatory as BOTH the key and request. This is a bit hacky since this is not the request. Let's discuss, we may need to clean up the purgatory api to make it gracefully handle this case. 7. I think the ProducerRequestPurgatory should be instantiated in KafkaApis not KafkaServer--it is an implementation detail of the api layer not a major subsystem. 8. The doubly nested map/flatMap in KafkaApis.handleProduceRequest is a little bit tricky could you clean that up or if that is not possible comment what it does (I think it just makes a list of (topic, partition) pairs, but the variable name doesn't explain it). Same with KafkaApis.handleFetchRequest. 9. DelayedProduce should definitely not be in the kafka.api package, that package is for request/response "structs". DelayedProduce is basically a bunch of server internals and not something the producer or consumer should be aware of. Likewise, the code for ProduceRequestPurgatory/FetchRequestPurgatory is kept differently, but these things are mirror images of one another. FetchRequestPurgatory is in KafkaApis, ProduceRequestPurgatory in its own file. These should match each other. The reasoning behind keeping things in KafkaApis was that those classes contained part of the logic for processing a request, so splitting it into two files makes it harder to read (the counter argument is that KafkaApis is getting pretty big). The other reason was so it could access the variables in KafkaApis as a nested class. Either way is probably fine, but they should be symetric between the two. 10. KafkaApis.handleProducerRequest and KafkaApis.produce are a little hard to differentiate. I think the later is effectively "produce to local log" and the former does the purgatory stuff. Would be good to call this out in the method name or comment. 11. I am not so sure about the logic of proactively responding to all outstanding requests with failure during a leadership change. Is this the right thing to do? 12. The logic in DelayedProduce.isSatisfied is very odd. I think what we want to do is respond to all outstanding requests in the event of a leadership change. But we do this by passing in the ReplicaManager and having logic that keys off whether or not the local broker is the leader. This seems quite convoluted. Wouldn't it make more sense to do this: (1) Have the replica manager allow "subscribers" to its state changes. (2) Add a subscriber that loops over all in-flight requests and expires them with an error when the local broker ceases to be the leader. There is a description of this pattern here http://codingjunkie.net/guava-eventbus/. 13. Nice catch with the request size. I think it is okay that we are using sizeInBytes, since it is not expected that the producer send fragments, though a fully correct implementation would check that. > tie producer-side ack with high watermark and progress of replicas > ------------------------------------------------------------------ > > Key: KAFKA-353 > URL: https://issues.apache.org/jira/browse/KAFKA-353 > Project: Kafka > Issue Type: Sub-task > Affects Versions: 0.8 > Reporter: Jun Rao > Assignee: Joel Koshy > Attachments: kafka-353_v1.patch > > -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira