Hello,
I am using kafka server 3.4.0 along with flink. Kafka server and Flink are 
installed on a 48 core , 252GB box. My use case is as follows -

8 Kafka producers writing events at 200K per second  to  kafka topic "Event" 
with 20 partitions, source for flink  --> Flink processing rules that read from 
Event topic and write to Alert topic --> kafka topic "Alert" with 20 
partitions, sink for flink.

It was all good until we started seeing that flink kafka consumer for Event 
topic getting timed out frequently as kafka responds quite late to the fetch 
requests. I am not able to figure out the reason why kafka takes a lot of time 
to process this FETCH request randomly. Is there a configuration that I must 
look at or nay other log that I must check to figure out whats going on? When 
everything is fine, kakfa takes only a few milliseconds to process the fetch 
requests.
Timeout of kafka consumer at the flink side is 30 secs and the consumer thread 
blocks until it gets a response or timesout.


Here are some kafka trace logs -

server.log
[2023-09-13 17:37:05,463] TRACE [Kafka Request Handler 1 on Broker 1], Kafka 
request handler 1 on broker 1 handling request Request(processor=0, 
connectionId=127.0.0.1:9092-127.0.0.1:39268-2028, 
session=Session(User:ANONYMOUS,/127.0.0.1), 
listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, buffer=null, 
envelope=None) (kafka.server.KafkaRequestHandler)
[2023-09-13 17:37:05,463] TRACE Sending FETCH response to client Event-8 of 
1048654 bytes. (kafka.network.RequestChannel)
[2023-09-13 17:37:05,463] TRACE Socket server received response to send to 
127.0.0.1:9092-127.0.0.1:39268-2028, registering for write and sending data: 
Response(type=Send, request=Request(processor=0, 
connectionId=127.0.0.1:9092-127.0.0.1:39268-2028, 
session=Session(User:ANONYMOUS,/127.0.0.1), 
listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, buffer=null, 
envelope=None), send=MultiRecordsSend(size=1048654, totalWritten=0), 
asString=Some({"throttleTimeMs":0,"errorCode":0,"sessionId":1670662638,"responses":[{"topicId":"MtsZtV-bRoCAwPiHqRIzvA","partitions":[{"partitionIndex":14,"errorCode":0,"highWatermark":37934921072,"lastStableOffset":37934921072,"logStartOffset":31032998864,"abortedTransactions":null,"preferredReadReplica":-1,"recordsSizeInBytes":1048576}]}]}))
 (kafka.network.Processor)

request-logs
[2023-09-13 17:37:05,463] TRACE Processor 0 received request: 
RequestHeader(apiKey=FETCH, apiVersion=13, clientId=Event-8, 
correlationId=39532, headerVersion=2) -- FetchRequestData(clusterId=null, 
replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, 
sessionId=1670662638, sessionEpoch=10, topics=[FetchTopic(topic='', 
topicId=MtsZtV-bRoCAwPiHqRIzvA, partitions=[FetchPartition(partition=14, 
currentLeaderEpoch=66, fetchOffset=37656832058, lastFetchedEpoch=-1, 
logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], 
rackId='') (kafka.network.RequestChannel$)
[2023-09-13 17:37:05,463] TRACE [KafkaApi-1] Handling 
request:RequestHeader(apiKey=FETCH, apiVersion=13, clientId=Event-8, 
correlationId=39532, headerVersion=2) -- FetchRequestData(clusterId=null, 
replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, 
sessionId=1670662638, sessionEpoch=10, topics=[FetchTopic(topic='', 
topicId=MtsZtV-bRoCAwPiHqRIzvA, partitions=[FetchPartition(partition=14, 
currentLeaderEpoch=66, fetchOffset=37656832058, lastFetchedEpoch=-1, 
logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], 
rackId='') from connection 
127.0.0.1:9092-127.0.0.1:39268-2028;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
 (kafka.server.KafkaApis)
[2023-09-13 17:37:05,463] TRACE [KafkaApi-1] Sending Fetch response with 
partitions.size=1, metadata=1670662638 (kafka.server.KafkaApis)
[2023-09-13 17:39:30,223] DEBUG Completed 
request:{"isForwarded":false,"requestHeader":{"requestApiKey":1,"requestApiVersion":13,"correlationId":39532,"clientId":"Event-8","requestApiKeyName":"FETCH"},"request":{"replicaId":-1,"maxWaitMs":500,"minBytes":1,"maxBytes":52428800,"isolationLevel":0,"sessionId":1670662638,"sessionEpoch":10,"topics":[{"topicId":"MtsZtV-bRoCAwPiHqRIzvA","partitions":[{"partition":14,"currentLeaderEpoch":66,"fetchOffset":37656832058,"lastFetchedEpoch":-1,"logStartOffset":-1,"partitionMaxBytes":1048576}]}],"forgottenTopicsData":[],"rackId":""},"response":{"throttleTimeMs":0,"errorCode":0,"sessionId":1670662638,"responses":[{"topicId":"MtsZtV-bRoCAwPiHqRIzvA","partitions":[{"partitionIndex":14,"errorCode":0,"highWatermark":37934921072,"lastStableOffset":37934921072,"logStartOffset":31032998864,"abortedTransactions":null,"preferredReadReplica":-1,"recordsSizeInBytes":1048576}]}]},"connection":"127.0.0.1:9092-127.0.0.1:39268-2028","totalTimeMs":144760.386,"requestQueueTimeMs":0.192,"localTimeMs":0.476,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.035,"sendTimeMs":144759.682,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"apache-kafka-java","softwareVersion":"3.4.0"}}
 (kafka.request.logger)


Thanks,
Neha
Caution: External email. Do not click or open attachments unless you know and 
trust the sender.

Reply via email to