[
https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13140419#comment-13140419
]
Jay Kreps commented on KAFKA-48:
--------------------------------
Yes, these are all good points. The work I have done so far just splits request
processing into a separate thread pool and enables asynchronous handling. This
is a fairly general thing we need for a few different use cases. Perhaps this
should be broken into a separate JIRA.
I have thought a little bit about how to do long poll, though. Logically what I
want to do is make it possible to give a minimum byte size for the response and
a maximum delay in ms; then have the server delay the response until we have at
least min_bytes messages in the response OR we hit the maximum delay time. The
goal is both to improve latency (by avoiding waiting in between poll requests),
to reduce load on the server (by not polling), and to make it possible to
improve throughput. If you set min_bytes = 0 or max_delay_ms = 0 you
effectively get the current behavior. The throughput improvement comes if you
set the min_bytes > 1; this would give a way to artificially increase the
response size for requests to the topic (i.e. avoid fetching only a few
messages at a time) while still giving hard latency guarantees. We have seen,
the request size is one of the important things for network throughput.
As you say, the only case to really consider is the multi-fetch case. The
single topic fetch can just be seen as a special case of this. I think your
first proposal is closer to what I had in mind. Having the response contain an
empty message set for the topics that have no data has very little overhead
since it is just positionally indexed, so it is like 4 bytes or something. I
don't like doing a poll() style interface that just returns ready topics
doesn't seem very useful to me because the only logical thing you can do is
then initiate a fetch on those topics, right? So might as well just send back
the data and have a single request type to worry about?
One of the tricky questions for multifetch is what does the minimum byte size
pertain to? A straight-forward implementation in the current system would be to
add the min_bytes and timeout to the fetch request which would effectively
bundle it up N times in the multi-fetch (currently multi-fetch is just N
fetches glued together). This doesn't really make sense, though. Which of these
minimum sizes would cause the single response to be sent? Would it be when all
conditions were satisfied or when one was satisfied? I think the only thing
that makes sense is to set these things at the request level. Ideally what I
would like to do is remove the fetch request entirely because it is redundant
and fix multi-fetch to have the following:
[(topic1, partitions1), (topic2, partitions2),...], max_total_size,
max_wait_ms
This also fixes the weird thing in multifetch now where you have to specify the
topic with each partition, so a request for 10 partitions on the same topic
repeats the topic name 10 times. This is an invasive change, though, since it
means request format changes.
I am also not 100% sure how to implement the min_bytes parameter efficiently
for multi-fetch. For the single fetch case it is pretty easy, the
implementation would be to keep a sort of hybrid priority queue by timeout time
(e.g. the unix timestamp at which we owe a response). When a fetch request came
in we would try to service it immediately, and if we could meet its
requirements we would immediately send a response. If we can't meet its
min_bytes requirement then we would calculate the offset for that
topic/partition at which the request would be unblocked (e.g. if the current
offset is X and the min_bytes is M then the target size is X+M). We would
insert new requests into this watchers list maintaining a sort by increasing
target size. Each time a produce request is handled we would respond to all the
watching requests whose target size is < then new offset, this would just
require walking the list until we see a request with a target size greater than
the current offset. All the newly unblocked requests would be added to the
response queue. So this means the only work added to a produce request is the
work of transferring newly unblocked requests to the response queue and at most
we only need to examine one blocked request.
The timeout could be implemented by keeping a priority queue of requests based
on the unix timestamp of the latest allowable response (i.e. the ts the request
came in, plus the max_wait_ms). We could add a background thread to remove
items from this as their timeout occurs, and add them to the response queue
with an empty response.
For the multifetch case, things are harder to do efficiently. The timeouts can
still work the same way. However the min_bytes is now over all the topics the
request covers. The only way I can see to implement this is to keep a counter
associated with each watcher, and have the watcher watch all the requested
topics. But now on each produce request we need to increment ALL the watchers
on the topic produced to.
Dunno, maybe for practical numbers of blocked requests (a few hundred? a
thousand?) this doesn't matter. Or maybe there is a more clever approach. Ideas
welcome.
> Implement optional "long poll" support in fetch request
> -------------------------------------------------------
>
> Key: KAFKA-48
> URL: https://issues.apache.org/jira/browse/KAFKA-48
> Project: Kafka
> Issue Type: Bug
> Reporter: Alan Cabrera
> Assignee: Jay Kreps
> Attachments: KAFKA-48-socket-server-refactor-draft.patch
>
>
> Currently, the fetch request is non-blocking. If there is nothing on the
> broker for the consumer to retrieve, the broker simply returns an empty set
> to the consumer. This can be inefficient, if you want to ensure low-latency
> because you keep polling over and over. We should make a blocking version of
> the fetch request so that the fetch request is not returned until the broker
> has at least one message for the fetcher or some timeout passes.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira