Hello, I am using kafka server 3.4.0 along with flink. Kafka server and Flink are installed on a 48 core , 252GB box. My use case is as follows -
8 Kafka producers writing events at 200K per second to kafka topic "Event" with 20 partitions, source for flink --> Flink processing rules that read from Event topic and write to Alert topic --> kafka topic "Alert" with 20 partitions, sink for flink. It was all good until we started seeing that flink kafka consumer for Event topic getting timed out frequently as kafka responds quite late to the fetch requests. I am not able to figure out the reason why kafka takes a lot of time to process this FETCH request randomly. Is there a configuration that I must look at or nay other log that I must check to figure out whats going on? When everything is fine, kakfa takes only a few milliseconds to process the fetch requests. Timeout of kafka consumer at the flink side is 30 secs and the consumer thread blocks until it gets a response or timesout. Here are some kafka trace logs - server.log [2023-09-13 17:37:05,463] TRACE [Kafka Request Handler 1 on Broker 1], Kafka request handler 1 on broker 1 handling request Request(processor=0, connectionId=127.0.0.1:9092-127.0.0.1:39268-2028, session=Session(User:ANONYMOUS,/127.0.0.1), listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, buffer=null, envelope=None) (kafka.server.KafkaRequestHandler) [2023-09-13 17:37:05,463] TRACE Sending FETCH response to client Event-8 of 1048654 bytes. (kafka.network.RequestChannel) [2023-09-13 17:37:05,463] TRACE Socket server received response to send to 127.0.0.1:9092-127.0.0.1:39268-2028, registering for write and sending data: Response(type=Send, request=Request(processor=0, connectionId=127.0.0.1:9092-127.0.0.1:39268-2028, session=Session(User:ANONYMOUS,/127.0.0.1), listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, buffer=null, envelope=None), send=MultiRecordsSend(size=1048654, totalWritten=0), asString=Some({"throttleTimeMs":0,"errorCode":0,"sessionId":1670662638,"responses":[{"topicId":"MtsZtV-bRoCAwPiHqRIzvA","partitions":[{"partitionIndex":14,"errorCode":0,"highWatermark":37934921072,"lastStableOffset":37934921072,"logStartOffset":31032998864,"abortedTransactions":null,"preferredReadReplica":-1,"recordsSizeInBytes":1048576}]}]})) (kafka.network.Processor) request-logs [2023-09-13 17:37:05,463] TRACE Processor 0 received request: RequestHeader(apiKey=FETCH, apiVersion=13, clientId=Event-8, correlationId=39532, headerVersion=2) -- FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=1670662638, sessionEpoch=10, topics=[FetchTopic(topic='', topicId=MtsZtV-bRoCAwPiHqRIzvA, partitions=[FetchPartition(partition=14, currentLeaderEpoch=66, fetchOffset=37656832058, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='') (kafka.network.RequestChannel$) [2023-09-13 17:37:05,463] TRACE [KafkaApi-1] Handling request:RequestHeader(apiKey=FETCH, apiVersion=13, clientId=Event-8, correlationId=39532, headerVersion=2) -- FetchRequestData(clusterId=null, replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=1670662638, sessionEpoch=10, topics=[FetchTopic(topic='', topicId=MtsZtV-bRoCAwPiHqRIzvA, partitions=[FetchPartition(partition=14, currentLeaderEpoch=66, fetchOffset=37656832058, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], rackId='') from connection 127.0.0.1:9092-127.0.0.1:39268-2028;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis) [2023-09-13 17:37:05,463] TRACE [KafkaApi-1] Sending Fetch response with partitions.size=1, metadata=1670662638 (kafka.server.KafkaApis) [2023-09-13 17:39:30,223] DEBUG Completed request:{"isForwarded":false,"requestHeader":{"requestApiKey":1,"requestApiVersion":13,"correlationId":39532,"clientId":"Event-8","requestApiKeyName":"FETCH"},"request":{"replicaId":-1,"maxWaitMs":500,"minBytes":1,"maxBytes":52428800,"isolationLevel":0,"sessionId":1670662638,"sessionEpoch":10,"topics":[{"topicId":"MtsZtV-bRoCAwPiHqRIzvA","partitions":[{"partition":14,"currentLeaderEpoch":66,"fetchOffset":37656832058,"lastFetchedEpoch":-1,"logStartOffset":-1,"partitionMaxBytes":1048576}]}],"forgottenTopicsData":[],"rackId":""},"response":{"throttleTimeMs":0,"errorCode":0,"sessionId":1670662638,"responses":[{"topicId":"MtsZtV-bRoCAwPiHqRIzvA","partitions":[{"partitionIndex":14,"errorCode":0,"highWatermark":37934921072,"lastStableOffset":37934921072,"logStartOffset":31032998864,"abortedTransactions":null,"preferredReadReplica":-1,"recordsSizeInBytes":1048576}]}]},"connection":"127.0.0.1:9092-127.0.0.1:39268-2028","totalTimeMs":144760.386,"requestQueueTimeMs":0.192,"localTimeMs":0.476,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.035,"sendTimeMs":144759.682,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"apache-kafka-java","softwareVersion":"3.4.0"}} (kafka.request.logger) Thanks, Neha Caution: External email. Do not click or open attachments unless you know and trust the sender.