RivenSun created KAFKA-13838:
--------------------------------

             Summary: Improve the poll method of ConsumerNetworkClient
                 Key: KAFKA-13838
                 URL: https://issues.apache.org/jira/browse/KAFKA-13838
             Project: Kafka
          Issue Type: Improvement
          Components: consumer
            Reporter: RivenSun
            Assignee: RivenSun


Briefly describe the process of sending clientRequest on the Kafka Client side, 
which is divided into two steps.

1.Selector.send(send) method
Kafka's underlying tcp connection channel ensures that data is sent to the 
network {*}sequentially{*}. KafkaChannel allows {*}only one send to be set at a 
time{*}. And the next InFlightRequest is allowed to be added only if the 
{color:#ff0000}queue.peekFirst().send.completed(){color} condition is met.
{code:java}
NetworkClient.isReady(node) ->
NetworkClient.canSendRequest(node) -> 
InFlightRequests.canSendMore(node){code}

2. Selector.poll(timeout)
After KafkaChannel sets a send each time, there should be a 
Selector.poll(timeout) call {*}subsequently{*}. Please refer to the comments on 
the Selector.send(send) method.
{code:java}
/**
 * Queue the given request for sending in the subsequent {@link #poll(long)} 
calls
 * @param send The request to send
 */
public void send(NetworkSend send) { {code}
Send may become *completed* *only after* the Selector.poll(timeout) method is 
executed, more detail see Selector.write(channel) methos.

 

Let's go back and look at this method: ConsumerNetworkClient.poll(Timer timer, 
PollCondition pollCondition, boolean disableWakeup) method.
There are three places involved in sending data in this method:
{code:java}
long pollDelayMs = trySend(timer.currentTimeMs());
->
 client.poll(...)
->
trySend(timer.currentTimeMs());

{code}
There are two problems with this process:

1. After calling the trySend(...) method for the second time, we should 
immediately call client.poll(0, timer.currentTimeMs()); , to ensure that the 
send generated each time can be consumed by the next Selector.poll() method.

2. The while loop in trySend(...) method can be removed
After a node executes client.send(request, now) for the first time, because the 
first request will never be *completed* here, the subsequent requests will 
never satisfy the client.ready(node, now) condition.
Although the current code will break directly on the second execution of the 
loop, there will be {*}an additional execution of the loop{*}.
Modify the code as follows:
{code:java}
long trySend(long now) {
    long pollDelayMs = maxPollTimeoutMs;

    // send any requests that can be sent now
    for (Node node : unsent.nodes()) {
        Iterator<ClientRequest> iterator = unsent.requestIterator(node);
        if (iterator.hasNext()) {
            pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));
            if (client.ready(node, now)) {
                client.send(iterator.next(), now);
                iterator.remove();
            }
        }
    }
    return pollDelayMs;
}{code}
3. By the way, the unsent.clean() method that is executed last can also be 
optimized.
Easier to read the code.
{code:java}
public void clean() {
    // the lock protects removal from a concurrent put which could otherwise 
mutate the
    // queue after it has been removed from the map
    synchronized (unsent) {
        unsent.values().removeIf(ConcurrentLinkedQueue::isEmpty);
    }
} {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to