[ 
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13250912#comment-13250912
 ] 

Neha Narkhede commented on KAFKA-48:
------------------------------------

This patch looks very good. Here are a few questions - 

1. I like the way the expired requests are handled by implementing the logic 
inside the FetchRequestPurgatory. However, can we not do the same for satisfied 
requests by providing a satisfy() abstract API in RequestPurgatory ? That gets 
rid of the handling of fetch requests inside handleProducerRequest() in 
KafkaApis, which is a little awkward to read. When we have the 
ProduceRequestPurgatory, the same satisfy() operation can send responses for 
produce requests once the fetch responses for the followers come in. 

2. I gave the RequestPurgatory data structure some thought. Not sure if this 
buys us anything over the current data structure. How about the following data 
structure for the RequestPurgatory - 

2.1. The watchers would be a priority heap (PriorityQueue), with the head being 
the DelayedItem with the least delay value (earliest expiration time). So for 
each (topic, partition), we have a PQ of watchers. 

2.2. The expiration data structure is another PQ of size n, where n is the 
number of keys in RequestPurgatory. This expiration PQ has the heads of each of 
the watcher lists above. 

2.3. The expiration thread will await on a condition variable with a timeout = 
delay of the head of the expiration PQ. The condition also gets signaled 
whenever the head of any of the n watcher list changes. 

2.4. When the expiration thread gets signaled, it removes its head element, 
expires it if its ready, ignores if its satisfied, and adds an element from the 
watch list it came from. It keeps doing this until its head has expiration time 
in the future. Then it goes back to awaiting on the condition variable. 

2.5. The item to be expired gets removed from its watch list as well as 
expiration PQ in O(1). 

2.6. The item that gets satisfied sets a flag and gets removed from its watcher 
list. If the satisfied item is the head of the watcher list, the expiration 
thread gets signaled to add new head to its PQ. 

2.7 Pros 
2.7.1. The watcher list doesn't maintain expired items, so doesn't need 
state-keeping for liveCount and maybePurge() 
2.7.2. During a watch operation, items only enter the expiration PQ if they are 
the head of the watcher list 
2.7.3. The expiration thread does a more informed get operation, instead of 
polling the queue in a loop. 

2.8. Cons 
2.8.1. watch operation is O(logn) where n is the number of DelayedItems for a 
key 
2.8.2 The forcePurge() operation on the expiration data structure still needs 
to happen in O(n) 

Did I miss something here ? Thoughts ? 

3. On the other hand, this is a huge non-trivial patch and you must be pretty 
tired of rebasing and working through unit tests. We could just discuss the 
above changes, and maybe file another JIRA to track it, instead of delaying 
this patch further. But that is your call.
                
> Implement optional "long poll" support in fetch request
> -------------------------------------------------------
>
>                 Key: KAFKA-48
>                 URL: https://issues.apache.org/jira/browse/KAFKA-48
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jun Rao
>            Assignee: Jay Kreps
>         Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48-v4.patch, 
> KAFKA-48.patch, kafka-48-v3-to-v4-changes.diff
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the 
> broker for the consumer to retrieve, the broker simply returns an empty set 
> to the consumer. This can be inefficient, if you want to ensure low-latency 
> because you keep polling over and over. We should make a blocking version of 
> the fetch request so that the fetch request is not returned until the broker 
> has at least one message for the fetcher or some timeout passes.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to