chirag-wadhwa5 commented on code in PR #19637:
URL: https://github.com/apache/kafka/pull/19637#discussion_r2077341351


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3049,6 +3049,11 @@ class KafkaApis(val requestChannel: RequestChannel,
       // Creating the shareFetchContext for Share Session Handling. if context 
creation fails, the request is failed directly here.
       shareFetchContext = sharePartitionManager.newContext(groupId, 
shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent, 
request.context.connectionId)
     } catch {
+      case e: ShareSessionLimitReachedException =>
+        // Throttle for maxWaitMs when share session limit is reached
+        requestHelper.throttle(quotas.request, request, 
shareFetchRequest.maxWait)

Review Comment:
   Thanks for the review. Yes, when a channel is muted, then it cannot receive 
any requests until it is unmuted. Even metadata and heartbeat requests will not 
be processed. But I think that should be fine. The throttled duration is max 
wait time time. If I am not wrong, the channel is muted everytime a request is 
received and remains muted until the response is sent back. When the partition 
does not have any records to be fetched, I believe any share fetch request is 
added to the purgatory and is actually waited for max wait time before a 
response is sent. That situation is identical to the broker being throttled for 
max wait time and no heartbeats or metadata requests can be processed during 
that time. There could be issues where max wait time is more than heartbeat 
session timeout though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to