Yu Wang created KAFKA-17139:
-------------------------------
Summary: MirrorSourceTask will stop mirroring when get
BufferUnderflowException
Key: KAFKA-17139
URL: https://issues.apache.org/jira/browse/KAFKA-17139
Project: Kafka
Issue Type: Bug
Components: connect, mirrormaker
Affects Versions: 3.7.1, 3.6.2, 3.5.2, 3.0.0
Reporter: Yu Wang
Attachments: image-2024-07-15-15-35-12-489.png
Recently we found the data mirroring of one of our partition stopped after got
the following exception
{code:java}
[2024-07-05 13:36:07,058] WARN Failure during poll.
(org.apache.kafka.connect.mirror.RheosHaMirrorSourceTask)org.apache.kafka.common.protocol.types.SchemaException:
Buffer underflow while parsing response for request with header
RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-null-8,
correlationId=-855959214) at
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:722)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:865)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at
org.apache.kafka.connect.mirror.RheosHaMirrorSourceTask.poll(RheosHaMirrorSourceTask.java:130)
at
org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:291)
at
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)Caused by:
java.nio.BufferUnderflowException at
java.nio.Buffer.nextGetIndex(Buffer.java:510) at
java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:427) at
org.apache.kafka.common.protocol.ByteBufferAccessor.readLong(ByteBufferAccessor.java:48)
at
org.apache.kafka.common.message.FetchResponseData$AbortedTransaction.read(FetchResponseData.java:1928)
at
org.apache.kafka.common.message.FetchResponseData$AbortedTransaction.<init>(FetchResponseData.java:1904)
at
org.apache.kafka.common.message.FetchResponseData$PartitionData.read(FetchResponseData.java:881)
at
org.apache.kafka.common.message.FetchResponseData$PartitionData.<init>(FetchResponseData.java:805)
at
org.apache.kafka.common.message.FetchResponseData$FetchableTopicResponse.read(FetchResponseData.java:524)
at
org.apache.kafka.common.message.FetchResponseData$FetchableTopicResponse.<init>(FetchResponseData.java:464)
at
org.apache.kafka.common.message.FetchResponseData.read(FetchResponseData.java:199)
at
org.apache.kafka.common.message.FetchResponseData.<init>(FetchResponseData.java:136)
at
org.apache.kafka.common.requests.FetchResponse.parse(FetchResponse.java:119)
at
org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:117)
at
org.apache.kafka.common.requests.AbstractResponse.parseResponse(AbstractResponse.java:109)
at
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:720)
... 17 more {code}
The exception only thrown once, then the consumer stopped to fetrch from the
node, the request rate to one of the Kafka broker dropped to 0
!image-2024-07-15-15-35-12-489.png|width=601,height=233!
After going through the code of KafkaConsumer, every time KafkaConsumer tries
to generate the fetch request to Kafka brokers, it will check if the target
broker exists in {*}nodesWithPendingFetchRequests{*}. If it exists, then skip
the target kafka broker in this round.
[https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L433]
The broker id can be removed only when the response completed.
[https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L600]
But in this case, the exception was thrown at *handleCompletedReceives,* which
means the node id will never be removed from the
*nodesWithPendingFetchRequests.*
[https://github.com/apache/kafka/blob/3.7.1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L594]
{code:java}
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutConnections(responses, updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses); {code}
So every time, when KafkaConnect source task try to call the *poll* method of
the MirrorSourceTask, the KafkaConsumer will skip fetch from the node id that
left in the *nodesWithPendingFetchRequests.*
This will make the MirrorMaker tasks stop the data mirroring with only 1 WARN
log.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)