[ https://issues.apache.org/jira/browse/KAFKA-19259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17956788#comment-17956788 ]
Arpit Goyal commented on KAFKA-19259: ------------------------------------- Hi [~lianetm] I am able to figure out the root cause of the issue. As per my observation it is happening because of incorrect handling of pendingFetchRequestFuture flag which control fetch request need to be sent or not. Let's walk through the code's logic inside pollForFetches: *Step 1: Check the Buffer* First, the application thread tries to get any records that might already be waiting in the FetchBuffer. {code:java} // Simplified from AsyncKafkaConsumer.pollForFetches() Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer); if (!fetch.isEmpty()) { return fetch; // Data was already there, return immediately } {code} On the first call, the buffer is empty, so this check fails. *Step 2: Send the Fetch Request* Since the buffer was empty, the application thread asks the background thread to go get some data. {code:java} // Simplified from AsyncKafkaConsumer.pollForFetches() sendFetches(timer); {code} this is where the flawed logic happens: # A CreateFetchRequestsEvent is sent. # The background thread creates the network request. # The background thread prematurely completes the future and nulls it out. What it means is request may not even yet received by broker , but pendingFetchRequestFuture has been marked completed. {code:java} List<UnsentRequest> requests = fetchRequests.entrySet().stream().map(entry -> { final Node fetchTarget = entry.getKey(); final FetchSessionHandler.FetchRequestData data = entry.getValue(); final FetchRequest.Builder request = createFetchRequest(fetchTarget, data); final BiConsumer<ClientResponse, Throwable> responseHandler = (clientResponse, error) -> { if (error != null) errorHandler.handle(fetchTarget, data, error); else successHandler.handle(fetchTarget, data, clientResponse); }; return new UnsentRequest(request, Optional.of(fetchTarget)).whenComplete(responseHandler); }).collect(Collectors.toList()); pendingFetchRequestFuture.complete(null); return new PollResult(requests); } catch (Throwable t) { // A "dummy" poll result is returned here rather than rethrowing the error because any error // that is thrown from any RequestManager.poll() method interrupts the polling of the other // request managers. pendingFetchRequestFuture.completeExceptionally(t); return PollResult.EMPTY; } finally { pendingFetchRequestFuture = null; } } {code} 4. The sendFetches() call unblocks on the application thread. *Step 3: Wait on awaitNotEmpty (The Crucial Step)* Now that the application thread has been told the request is "sent," it doesn't return immediately. Instead, it waits on the buffer. {code:java} // Simplified from AsyncKafkaConsumer.pollForFetches() fetchBuffer.awaitNotEmpty(timer); // BLOCKING CALL return fetchCollector.collectFetch(fetchBuffer); {code} The fetchBuffer.awaitNotEmpty(timer) call does the following: "Pause the application thread until one of two things happens: 1. The background thread puts new records into the fetchBuffer and notifies me. 2. The timer (which is controlled by your poll(timeout)) expires." *Step 4: The Two Paths (The Bug's Impact)* This is where the outcome diverges and where delay comes from. *_Scenario A: The Happy Path (Data Arrives)_* # The response for FetchRequest #1 arrives from the broker with records. # The background thread processes the response and calls fetchBuffer.add(records). # This add operation notifies the condition variable that awaitNotEmpty is waiting on. # The application thread in awaitNotEmpty wakes up instantly, collects the fetch, and returns the data to the user. This feels fast and responsive. _*Scenario B: The Buggy Path (Empty Response)*_ # The response for FetchRequest #1 arrives from the broker, but it's empty. # The background thread sees the empty response. It does not add anything to the fetchBuffer. # Because the pendingFetchRequestFuture was already cleared, the background thread does not know it should immediately send a new fetch request. It simply goes idle. {code:java} private PollResult pollInternal(FetchRequestPreparer fetchRequestPreparer, ResponseHandler<ClientResponse> successHandler, ResponseHandler<Throwable> errorHandler) { if (pendingFetchRequestFuture == null) { log.info("HACK (Background Thread): pollInternal was called, but pendingFetchRequestFuture is null. No action will be taken."); // If no explicit request for creating fetch requests was issued, just short-circuit. return PollResult.EMPTY; } {code} {color:#FF0000}Therefore, the awaitNotEmpty call on the application thread is never notified. It has no choice but to sit there and wait until the timer you provided in poll(timeout) expires{color}. I have also added log statements which depict the above behaviour.Attaching the full logs.[^consumer11_KAFKA-19259.log] {code:java} [2025-06-08 22:23:34,921] DEBUG [Consumer clientId=console-consumer, groupId=cg15] Entering awaitNotEmpty with timer remaining: 4065ms (org.apache.kafka.clients.consumer.internals.FetchBuffer) [2025-06-08 22:23:34,921] DEBUG [Consumer clientId=console-consumer, groupId=cg15] Buffer is empty, will wait. Timer remaining: 4065ms, wokenup: false (org.apache.kafka.clients.consumer.internals.FetchBuffer) [2025-06-08 22:23:34,921] DEBUG [Consumer clientId=console-consumer, groupId=cg15] Starting wait at 1749401614921 for 4065ms (org.apache.kafka.clients.consumer.internals.FetchBuffer) [2025-06-08 22:23:35,431] DEBUG [Consumer clientId=console-consumer, groupId=cg15] Received FETCH response from node 1 for request with header RequestHeader(apiKey=FETCH, apiVersion=17, clientId=console-consumer, correlationId=79, headerVersion=2): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1018152883, responses=[], nodeEndpoints=[]) (org.apache.kafka.clients.NetworkClient) [2025-06-08 22:23:35,433] DEBUG [Consumer clientId=console-consumer, groupId=cg15] Node 1 sent an incremental fetch response with throttleTimeMs = 0 for session 1018152883 with 0 response partition(s), 6 implied partition(s) (org.apache.kafka.clients.FetchSessionHandler) [2025-06-08 22:23:35,433] DEBUG [Consumer clientId=console-consumer, groupId=cg15] Removing pending request for fetch session: 1018152883 for node: localhost:9092 (id: 1 rack: null isFenced: false) (org.apache.kafka.clients.consumer.internals.AbstractFetch) [2025-06-08 22:23:35,434] INFO [Consumer clientId=console-consumer, groupId=cg15] HACK (Background Thread): pollInternal was called, but pendingFetchRequestFuture is null. No action will be taken. (org.apache.kafka.clients.consumer.internals.FetchRequestManager) [2025-06-08 22:23:38,994] DEBUG [Consumer clientId=console-consumer, groupId=cg15] Wait timed out after 4071ms (org.apache.kafka.clients.consumer.internals.FetchBuffer) [2025-06-08 22:23:39,004] INFO [Consumer clientId=console-consumer, groupId=cg15] HACK (Background Thread): pollInternal was called, but pendingFetchRequestFuture is null. No action will be taken. (org.apache.kafka.clients.consumer.internals.FetchRequestManager) [2025-06-08 22:23:39,406] DEBUG [Consumer clientId=console-consumer, groupId=cg15] Exiting awaitNotEmpty. Buffer empty: true, wokenup: false, timer remaining: 4065ms ( {code} > Async consumer fetch intermittent delays on console consumer > ------------------------------------------------------------ > > Key: KAFKA-19259 > URL: https://issues.apache.org/jira/browse/KAFKA-19259 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Affects Versions: 4.0.0 > Reporter: Lianet Magrans > Assignee: Arpit Goyal > Priority: Major > Fix For: 4.1.0 > > Attachments: Screenshot 2025-05-31 at 10.44.29 PM.png, > console-consumer-classic-vs-consumer.mov, consumer11_KAFKA-19259.log, > consumer_KAFKA-19259.log, debug5.log > > > We noticed that fetching with the kafka-console-consumer.sh tool using the > new consumer shows some intermittent delays, that are not seen when running > the same with the classic consumer. Note that I disabled auto-commit to > isolate the delay, and from a first look seems to come from the > fetchBuffer.awaitNonEmpty logic, that alternatively takes almost the full > poll timeout (runs "fast", then "slow", and continues to alternate) > [https://github.com/apache/kafka/blob/0b81d6c7802c1be55dc823ce51729f2c6a6071a7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1808] > > The difference in behaviour between the 2 consumers can be seen with this > setup: > * topic with 6 partitions (I tried with 1 partition first and didn't see the > delay, then with 3 and 6 I could see it) > * data populated in topic with producer sending generated uuids to the topic > in while loop > * run console consumer (asycn) no commit: > bin/kafka-console-consumer.sh --topic t1 --bootstrap-server localhost:9092 > --consumer-property group.protocol=consumer --group cg1 --consumer-property > enable.auto.commit=false > Here we can notice the pattern that looks like batches, and custom logs on > the awaitNonEmpty show it take the full poll timeout on alternate poll > iterations. > * run same but for classic consumer (consumer-property > group.protocol=classic) -> not such pattern of intermittent delays > Produce continuously (I used this) > while sleep 1; do echo $(uuidgen); done | bin/kafka-console-producer.sh > --bootstrap-server localhost:9092 --topic t1 > This needs more investigation to fully understand if it's indeed something in > the fetch path or something else) -- This message was sent by Atlassian Jira (v8.20.10#820010)