chirag-wadhwa5 commented on code in PR #19637: URL: https://github.com/apache/kafka/pull/19637#discussion_r2077341351
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3049,6 +3049,11 @@ class KafkaApis(val requestChannel: RequestChannel, // Creating the shareFetchContext for Share Session Handling. if context creation fails, the request is failed directly here. shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent, request.context.connectionId) } catch { + case e: ShareSessionLimitReachedException => + // Throttle for maxWaitMs when share session limit is reached + requestHelper.throttle(quotas.request, request, shareFetchRequest.maxWait) Review Comment: Thanks for the review. Yes, when a channel is muted, then it cannot receive any requests until it is unmuted. Even metadata and heartbeat requests will not be processed. But I think that should be fine. The throttled duration is max wait time time. If I am not wrong, the channel is muted everytime a request is received and remains muted until the response is sent back. When the partition does not have any records to be fetched, I believe any share fetch request is added to the purgatory and is actually waited for max wait time before a response is sent. That situation is identical to the broker being throttled for max wait time and no heartbeats or metadata requests can be processed during that time. There could be issues where max wait time is more than heartbeat session timeout though -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org