[ 
https://issues.apache.org/jira/browse/KAFKA-19259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17956788#comment-17956788
 ] 

Arpit Goyal commented on KAFKA-19259:
-------------------------------------

Hi [~lianetm]  I am able to figure out the root cause of the issue. As per my 
observation it is happening because of incorrect handling of 
pendingFetchRequestFuture flag which control  fetch request need to be sent or 
not.

Let's walk through the code's logic inside pollForFetches:
*Step 1: Check the Buffer*
First, the application thread tries to get any records that might already be 
waiting in the FetchBuffer.
{code:java}
// Simplified from AsyncKafkaConsumer.pollForFetches()
Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
if (!fetch.isEmpty()) {
    return fetch; // Data was already there, return immediately
}
{code}
On the first call, the buffer is empty, so this check fails.
*Step 2: Send the Fetch Request*
Since the buffer was empty, the application thread asks the background thread 
to go get some data.
{code:java}
// Simplified from AsyncKafkaConsumer.pollForFetches()
sendFetches(timer);
{code}
this is where the flawed logic happens:
 # A CreateFetchRequestsEvent is sent.
 # The background thread creates the network request.
 # The background thread prematurely completes the future and nulls it out. 
What it means is request may not even yet received by broker , but 
pendingFetchRequestFuture has been marked completed.

{code:java}
      List<UnsentRequest> requests = 
fetchRequests.entrySet().stream().map(entry -> {
                final Node fetchTarget = entry.getKey();
                final FetchSessionHandler.FetchRequestData data = 
entry.getValue();
                final FetchRequest.Builder request = 
createFetchRequest(fetchTarget, data);
                final BiConsumer<ClientResponse, Throwable> responseHandler = 
(clientResponse, error) -> {
                    if (error != null)
                        errorHandler.handle(fetchTarget, data, error);
                    else
                        successHandler.handle(fetchTarget, data, 
clientResponse);
                };

                return new UnsentRequest(request, 
Optional.of(fetchTarget)).whenComplete(responseHandler);
            }).collect(Collectors.toList());

            pendingFetchRequestFuture.complete(null);
            return new PollResult(requests);
        } catch (Throwable t) {
            // A "dummy" poll result is returned here rather than rethrowing 
the error because any error
            // that is thrown from any RequestManager.poll() method interrupts 
the polling of the other
            // request managers.
            pendingFetchRequestFuture.completeExceptionally(t);
            return PollResult.EMPTY;
        } finally {
            pendingFetchRequestFuture = null;
        }
    }
{code}
    4. The sendFetches() call unblocks on the application thread.
*Step 3: Wait on awaitNotEmpty (The Crucial Step)*
Now that the application thread has been told the request is "sent," it doesn't 
return immediately. Instead, it waits on the buffer.
{code:java}
// Simplified from AsyncKafkaConsumer.pollForFetches()
fetchBuffer.awaitNotEmpty(timer); // BLOCKING CALL
return fetchCollector.collectFetch(fetchBuffer);
{code}
The fetchBuffer.awaitNotEmpty(timer) call does the following: "Pause the 
application thread until one of two things happens:
1. The background thread puts new records into the fetchBuffer and notifies me.
2. The timer (which is controlled by your poll(timeout)) expires."
*Step 4: The Two Paths (The Bug's Impact)*
This is where the outcome diverges and where delay comes from.
*_Scenario A: The Happy Path (Data Arrives)_*
 # The response for FetchRequest #1 arrives from the broker with records.
 # The background thread processes the response and calls 
fetchBuffer.add(records).
 # This add operation notifies the condition variable that awaitNotEmpty is 
waiting on.
 # The application thread in awaitNotEmpty wakes up instantly, collects the 
fetch, and returns the data to the user. This feels fast and responsive.

_*Scenario B: The Buggy Path (Empty Response)*_
 # The response for FetchRequest #1 arrives from the broker, but it's empty.
 # The background thread sees the empty response. It does not add anything to 
the fetchBuffer.
 # Because the pendingFetchRequestFuture was already cleared, the background 
thread does not know it should immediately send a new fetch request. It simply 
goes idle.

{code:java}
private PollResult pollInternal(FetchRequestPreparer fetchRequestPreparer,
                                    ResponseHandler<ClientResponse> 
successHandler,
                                    ResponseHandler<Throwable> errorHandler) {
        if (pendingFetchRequestFuture == null) {
            log.info("HACK (Background Thread): pollInternal was called, but 
pendingFetchRequestFuture is null. No action will be taken.");
            // If no explicit request for creating fetch requests was issued, 
just short-circuit.
            return PollResult.EMPTY;
        }

{code}
{color:#FF0000}Therefore, the awaitNotEmpty call on the application thread is 
never notified. It has no choice but to sit there and wait until the timer you 
provided in poll(timeout) expires{color}.

I have also added log statements which  depict the above behaviour.Attaching 
the full logs.[^consumer11_KAFKA-19259.log]
{code:java}
[2025-06-08 22:23:34,921] DEBUG [Consumer clientId=console-consumer, 
groupId=cg15] Entering awaitNotEmpty with timer remaining: 4065ms 
(org.apache.kafka.clients.consumer.internals.FetchBuffer)
[2025-06-08 22:23:34,921] DEBUG [Consumer clientId=console-consumer, 
groupId=cg15] Buffer is empty, will wait. Timer remaining: 4065ms, wokenup: 
false (org.apache.kafka.clients.consumer.internals.FetchBuffer)
[2025-06-08 22:23:34,921] DEBUG [Consumer clientId=console-consumer, 
groupId=cg15] Starting wait at 1749401614921 for 4065ms 
(org.apache.kafka.clients.consumer.internals.FetchBuffer)
[2025-06-08 22:23:35,431] DEBUG [Consumer clientId=console-consumer, 
groupId=cg15] Received FETCH response from node 1 for request with header 
RequestHeader(apiKey=FETCH, apiVersion=17, clientId=console-consumer, 
correlationId=79, headerVersion=2): FetchResponseData(throttleTimeMs=0, 
errorCode=0, sessionId=1018152883, responses=[], nodeEndpoints=[]) 
(org.apache.kafka.clients.NetworkClient)
[2025-06-08 22:23:35,433] DEBUG [Consumer clientId=console-consumer, 
groupId=cg15] Node 1 sent an incremental fetch response with throttleTimeMs = 0 
for session 1018152883 with 0 response partition(s), 6 implied partition(s) 
(org.apache.kafka.clients.FetchSessionHandler)
[2025-06-08 22:23:35,433] DEBUG [Consumer clientId=console-consumer, 
groupId=cg15] Removing pending request for fetch session: 1018152883 for node: 
localhost:9092 (id: 1 rack: null isFenced: false) 
(org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-06-08 22:23:35,434] INFO [Consumer clientId=console-consumer, 
groupId=cg15] HACK (Background Thread): pollInternal was called, but 
pendingFetchRequestFuture is null. No action will be taken. 
(org.apache.kafka.clients.consumer.internals.FetchRequestManager)
[2025-06-08 22:23:38,994] DEBUG [Consumer clientId=console-consumer, 
groupId=cg15] Wait timed out after 4071ms 
(org.apache.kafka.clients.consumer.internals.FetchBuffer)
[2025-06-08 22:23:39,004] INFO [Consumer clientId=console-consumer, 
groupId=cg15] HACK (Background Thread): pollInternal was called, but 
pendingFetchRequestFuture is null. No action will be taken. 
(org.apache.kafka.clients.consumer.internals.FetchRequestManager)
[2025-06-08 22:23:39,406] DEBUG [Consumer clientId=console-consumer, 
groupId=cg15] Exiting awaitNotEmpty. Buffer empty: true, wokenup: false, timer 
remaining: 4065ms (
{code}

> Async consumer fetch intermittent delays on console consumer
> ------------------------------------------------------------
>
>                 Key: KAFKA-19259
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19259
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 4.0.0
>            Reporter: Lianet Magrans
>            Assignee: Arpit Goyal
>            Priority: Major
>             Fix For: 4.1.0
>
>         Attachments: Screenshot 2025-05-31 at 10.44.29 PM.png, 
> console-consumer-classic-vs-consumer.mov, consumer11_KAFKA-19259.log, 
> consumer_KAFKA-19259.log, debug5.log
>
>
> We noticed that fetching with the kafka-console-consumer.sh tool using the 
> new consumer shows some intermittent delays, that are not seen when running 
> the same with the classic consumer. Note that I disabled auto-commit to 
> isolate the delay, and from a first look seems to come from the 
> fetchBuffer.awaitNonEmpty logic, that alternatively takes almost the full 
> poll timeout (runs "fast", then "slow", and continues to alternate)
> [https://github.com/apache/kafka/blob/0b81d6c7802c1be55dc823ce51729f2c6a6071a7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1808]
>   
> The difference in behaviour between the 2 consumers can be seen with this 
> setup:
>  * topic with 6 partitions (I tried with 1 partition first and didn't see the 
> delay, then with 3 and 6 I could see it) 
>  * data populated in topic with producer sending generated uuids to the topic 
> in while loop 
>  * run console consumer (asycn) no commit:
> bin/kafka-console-consumer.sh --topic t1 --bootstrap-server localhost:9092 
> --consumer-property group.protocol=consumer --group cg1 --consumer-property 
> enable.auto.commit=false
> Here we can notice the pattern that looks like batches, and custom logs on 
> the awaitNonEmpty show it take the full poll timeout on alternate poll 
> iterations.
>  * run same but for classic consumer (consumer-property 
> group.protocol=classic) -> not such pattern of intermittent delays
> Produce continuously (I used this) 
> while sleep 1; do echo $(uuidgen); done | bin/kafka-console-producer.sh 
> --bootstrap-server localhost:9092 --topic t1
> This needs more investigation to fully understand if it's indeed something in 
> the fetch path or something else) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to