[
https://issues.apache.org/jira/browse/KAFKA-13838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang updated KAFKA-13838:
----------------------------------
Labels: new-consumer-threading-should-fix (was: )
> Improve the poll method of ConsumerNetworkClient
> ------------------------------------------------
>
> Key: KAFKA-13838
> URL: https://issues.apache.org/jira/browse/KAFKA-13838
> Project: Kafka
> Issue Type: Improvement
> Components: consumer
> Reporter: RivenSun
> Assignee: RivenSun
> Priority: Major
> Labels: new-consumer-threading-should-fix
>
> Briefly describe the process of sending clientRequest on the Kafka Client
> side, which is divided into two steps.
> 1.Selector.send(send) method
> Kafka's underlying tcp connection channel ensures that data is sent to the
> network {*}sequentially{*}. KafkaChannel allows {*}only one send to be set at
> a time{*}. And the next InFlightRequest is allowed to be added only if the
> {color:#ff0000}queue.peekFirst().send.completed(){color} condition is met.
> {code:java}
> NetworkClient.isReady(node) ->
> NetworkClient.canSendRequest(node) ->
> InFlightRequests.canSendMore(node){code}
> 2. Selector.poll(timeout)
> After KafkaChannel sets a send each time, there should be a
> Selector.poll(timeout) call {*}subsequently{*}. Please refer to the comments
> on the Selector.send(send) method.
> {code:java}
> /**
> * Queue the given request for sending in the subsequent {@link #poll(long)}
> calls
> * @param send The request to send
> */
> public void send(NetworkSend send) { {code}
> Send may become *completed* *only after* the Selector.poll(timeout) method is
> executed, more detail see Selector.write(channel) methos.
>
> Let's go back and look at this method: ConsumerNetworkClient.poll(Timer
> timer, PollCondition pollCondition, boolean disableWakeup) method.
> There are three places involved in sending data in this method:
> {code:java}
> long pollDelayMs = trySend(timer.currentTimeMs());
> ->
> client.poll(...)
> ->
> trySend(timer.currentTimeMs());
> {code}
> There are two problems with this process:
> 1. After calling the trySend(...) method for the second time, we should
> immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the
> send generated each time can be consumed by the next Selector.poll() method.
> 2. The while loop in trySend(...) method can be removed
> After a node executes client.send(request, now) for the first time, because
> the first request will never be *completed* here, the subsequent requests
> will never satisfy the client.ready(node, now) condition.
> Although the current code will break directly on the second execution of the
> loop, there will be {*}an additional execution of the loop{*}.
> Modify the code as follows:
> {code:java}
> long trySend(long now) {
> long pollDelayMs = maxPollTimeoutMs;
> // send any requests that can be sent now
> for (Node node : unsent.nodes()) {
> Iterator<ClientRequest> iterator = unsent.requestIterator(node);
> if (iterator.hasNext()) {
> pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node,
> now));
> if (client.ready(node, now)) {
> client.send(iterator.next(), now);
> iterator.remove();
> }
> }
> }
> return pollDelayMs;
> }{code}
> 3. By the way, the unsent.clean() method that is executed last can also be
> optimized.
> Easier to read the code.
> {code:java}
> public void clean() {
> // the lock protects removal from a concurrent put which could otherwise
> mutate the
> // queue after it has been removed from the map
> synchronized (unsent) {
> unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty);
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)