[ 
https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13400595#comment-13400595
 ] 

Jay Kreps commented on KAFKA-353:
---------------------------------

This doesn't seem to apply cleanly on 0.8, I get conflicts on 
ReplicaFetcherThread and ReplicaManager.

A few comments:
1. There is a race condition between adding to the purgatory and 
acknowledgement, I think. We first produce locally and then add the watcher, 
which allows a window where the message is available for replication with no 
watcher on ack. If a replica acknowledges a given produce before the 
DelayedProduce is added as a watcher that will not be recorded, I think, and 
the request will potentially timeout waiting for ISR acks. This would certainly 
happen and lead to sporadic timeouts. This is tricky, let's discuss. One 
solution would be to reverse the logic--add the watcher first then produce it 
locally, but this means having to handle the case where the local produce fails 
(we don't want to leak watchers).
2. As you say, you definitely can't pass KafkaApis to anything, making 
DelayedProduceReuqest an inner class of KafkaApis will fix this, I think.
3. I think the way we are tying the api handling and replica manager is not 
good. replica manager shouldn't know about DelayedProduce requests if possible, 
it should only know about replica management and be oblivous to the api level.
4. Why is the ack timeout in seconds? It should be milliseconds like everything 
else, right? One would commonly want a sub-second timeout.
5. As a stylistic thing it would be good to avoid direct use of Tuple2[A,B] and 
instead use (A,B) (e.g. myMethod: List[(A,B)] instead of myMethod: 
List[Tuple2[A,B]]
6. I don't understand why we are passing (topic, partition) to the purgatory as 
BOTH the key and request. This is a bit hacky since this is not the request. 
Let's discuss, we may need to clean up the purgatory api to make it gracefully 
handle this case.
7. I think the ProducerRequestPurgatory should be instantiated in KafkaApis not 
KafkaServer--it is an implementation detail of the api layer not a major 
subsystem.
8. The doubly nested map/flatMap in KafkaApis.handleProduceRequest is a little 
bit tricky could you clean that up or if that is not possible comment what it 
does (I think it just makes a list of (topic, partition) pairs, but the 
variable name doesn't explain it). Same with KafkaApis.handleFetchRequest.
9. DelayedProduce should definitely not be in the kafka.api package, that 
package is for request/response "structs". DelayedProduce is basically a bunch 
of server internals and not something the producer or consumer should be aware 
of. Likewise, the code for ProduceRequestPurgatory/FetchRequestPurgatory is 
kept differently, but these things are mirror images of one another. 
FetchRequestPurgatory is in KafkaApis, ProduceRequestPurgatory in its own file. 
These should match each other. The reasoning behind keeping things in KafkaApis 
was that those classes contained part of the logic for processing a request, so 
splitting it into two files makes it harder to read (the counter argument is 
that KafkaApis is getting pretty big). The other reason was so it could access 
the variables in KafkaApis as a nested class. Either way is probably fine, but 
they should be symetric between the two.
10. KafkaApis.handleProducerRequest and KafkaApis.produce are a little hard to 
differentiate. I think the later is effectively "produce to local log" and the 
former does the purgatory stuff. Would be good to call this out in the method 
name or comment.
11. I am not so sure about the logic of proactively responding to all 
outstanding requests with failure during a leadership change. Is this the right 
thing to do?
12. The logic in DelayedProduce.isSatisfied is very odd. I think what we want 
to do is respond to all outstanding requests in the event of a leadership 
change. But we do this by passing in the ReplicaManager and having logic that 
keys off whether or not the local broker is the leader. This seems quite 
convoluted. Wouldn't it make more sense to do this: (1) Have the replica 
manager allow "subscribers" to its state changes. (2) Add a subscriber that 
loops over all in-flight requests and expires them with an error when the local 
broker ceases to be the leader. There is a description of this pattern here 
http://codingjunkie.net/guava-eventbus/.
13. Nice catch with the request size. I think it is okay that we are using 
sizeInBytes, since it is not expected that the producer send fragments, though 
a fully correct implementation would check that.
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to