[ https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13250912#comment-13250912 ]
Neha Narkhede commented on KAFKA-48: ------------------------------------ This patch looks very good. Here are a few questions - 1. I like the way the expired requests are handled by implementing the logic inside the FetchRequestPurgatory. However, can we not do the same for satisfied requests by providing a satisfy() abstract API in RequestPurgatory ? That gets rid of the handling of fetch requests inside handleProducerRequest() in KafkaApis, which is a little awkward to read. When we have the ProduceRequestPurgatory, the same satisfy() operation can send responses for produce requests once the fetch responses for the followers come in. 2. I gave the RequestPurgatory data structure some thought. Not sure if this buys us anything over the current data structure. How about the following data structure for the RequestPurgatory - 2.1. The watchers would be a priority heap (PriorityQueue), with the head being the DelayedItem with the least delay value (earliest expiration time). So for each (topic, partition), we have a PQ of watchers. 2.2. The expiration data structure is another PQ of size n, where n is the number of keys in RequestPurgatory. This expiration PQ has the heads of each of the watcher lists above. 2.3. The expiration thread will await on a condition variable with a timeout = delay of the head of the expiration PQ. The condition also gets signaled whenever the head of any of the n watcher list changes. 2.4. When the expiration thread gets signaled, it removes its head element, expires it if its ready, ignores if its satisfied, and adds an element from the watch list it came from. It keeps doing this until its head has expiration time in the future. Then it goes back to awaiting on the condition variable. 2.5. The item to be expired gets removed from its watch list as well as expiration PQ in O(1). 2.6. The item that gets satisfied sets a flag and gets removed from its watcher list. If the satisfied item is the head of the watcher list, the expiration thread gets signaled to add new head to its PQ. 2.7 Pros 2.7.1. The watcher list doesn't maintain expired items, so doesn't need state-keeping for liveCount and maybePurge() 2.7.2. During a watch operation, items only enter the expiration PQ if they are the head of the watcher list 2.7.3. The expiration thread does a more informed get operation, instead of polling the queue in a loop. 2.8. Cons 2.8.1. watch operation is O(logn) where n is the number of DelayedItems for a key 2.8.2 The forcePurge() operation on the expiration data structure still needs to happen in O(n) Did I miss something here ? Thoughts ? 3. On the other hand, this is a huge non-trivial patch and you must be pretty tired of rebasing and working through unit tests. We could just discuss the above changes, and maybe file another JIRA to track it, instead of delaying this patch further. But that is your call. > Implement optional "long poll" support in fetch request > ------------------------------------------------------- > > Key: KAFKA-48 > URL: https://issues.apache.org/jira/browse/KAFKA-48 > Project: Kafka > Issue Type: Bug > Reporter: Jun Rao > Assignee: Jay Kreps > Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48-v4.patch, > KAFKA-48.patch, kafka-48-v3-to-v4-changes.diff > > > Currently, the fetch request is non-blocking. If there is nothing on the > broker for the consumer to retrieve, the broker simply returns an empty set > to the consumer. This can be inefficient, if you want to ensure low-latency > because you keep polling over and over. We should make a blocking version of > the fetch request so that the fetch request is not returned until the broker > has at least one message for the fetcher or some timeout passes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira