[
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13250912#comment-13250912
]
Neha Narkhede commented on KAFKA-48:
------------------------------------
This patch looks very good. Here are a few questions -
1. I like the way the expired requests are handled by implementing the logic
inside the FetchRequestPurgatory. However, can we not do the same for satisfied
requests by providing a satisfy() abstract API in RequestPurgatory ? That gets
rid of the handling of fetch requests inside handleProducerRequest() in
KafkaApis, which is a little awkward to read. When we have the
ProduceRequestPurgatory, the same satisfy() operation can send responses for
produce requests once the fetch responses for the followers come in.
2. I gave the RequestPurgatory data structure some thought. Not sure if this
buys us anything over the current data structure. How about the following data
structure for the RequestPurgatory -
2.1. The watchers would be a priority heap (PriorityQueue), with the head being
the DelayedItem with the least delay value (earliest expiration time). So for
each (topic, partition), we have a PQ of watchers.
2.2. The expiration data structure is another PQ of size n, where n is the
number of keys in RequestPurgatory. This expiration PQ has the heads of each of
the watcher lists above.
2.3. The expiration thread will await on a condition variable with a timeout =
delay of the head of the expiration PQ. The condition also gets signaled
whenever the head of any of the n watcher list changes.
2.4. When the expiration thread gets signaled, it removes its head element,
expires it if its ready, ignores if its satisfied, and adds an element from the
watch list it came from. It keeps doing this until its head has expiration time
in the future. Then it goes back to awaiting on the condition variable.
2.5. The item to be expired gets removed from its watch list as well as
expiration PQ in O(1).
2.6. The item that gets satisfied sets a flag and gets removed from its watcher
list. If the satisfied item is the head of the watcher list, the expiration
thread gets signaled to add new head to its PQ.
2.7 Pros
2.7.1. The watcher list doesn't maintain expired items, so doesn't need
state-keeping for liveCount and maybePurge()
2.7.2. During a watch operation, items only enter the expiration PQ if they are
the head of the watcher list
2.7.3. The expiration thread does a more informed get operation, instead of
polling the queue in a loop.
2.8. Cons
2.8.1. watch operation is O(logn) where n is the number of DelayedItems for a
key
2.8.2 The forcePurge() operation on the expiration data structure still needs
to happen in O(n)
Did I miss something here ? Thoughts ?
3. On the other hand, this is a huge non-trivial patch and you must be pretty
tired of rebasing and working through unit tests. We could just discuss the
above changes, and maybe file another JIRA to track it, instead of delaying
this patch further. But that is your call.
> Implement optional "long poll" support in fetch request
> -------------------------------------------------------
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
> Issue Type: Bug
> Reporter: Jun Rao
> Assignee: Jay Kreps
> Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48-v4.patch,
> KAFKA-48.patch, kafka-48-v3-to-v4-changes.diff
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the
> broker for the consumer to retrieve, the broker simply returns an empty set
> to the consumer. This can be inefficient, if you want to ensure low-latency
> because you keep polling over and over. We should make a blocking version of
> the fetch request so that the fetch request is not returned until the broker
> has at least one message for the fetcher or some timeout passes.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira