abbccdda commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r553775686
##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1790,17 +1790,41 @@ class KafkaApis(val requestChannel: RequestChannel,
else {
val supportedFeatures = brokerFeatures.supportedFeatures
val finalizedFeaturesOpt = finalizedFeatureCache.get
- finalizedFeaturesOpt match {
- case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
- requestThrottleMs,
- config.interBrokerProtocolVersion.recordVersion.value,
- supportedFeatures,
- finalizedFeatures.features,
- finalizedFeatures.epoch)
- case None => ApiVersion.apiVersionsResponse(
- requestThrottleMs,
- config.interBrokerProtocolVersion.recordVersion.value,
- supportedFeatures)
+ val controllerApiVersions = if (isForwardingEnabled(request)) {
+ forwardingManager.controllerApiVersions()
+ } else
+ None
+
+ if (isForwardingEnabled(request) && controllerApiVersions.isEmpty) {
Review comment:
I think it relates to the timing of the forwarding. When the controller
switches and certain ApiVersions are updated, the forwarding response will
contain unsupported version exception. At that time, we would do a disconnect
to the client. We have two options here to trigger the disconnect:
1. when we detect unsupported version exception from the forwarding response
2. when we detect the new ApiVersion set is different from the previous one
I personally think #1 is more accurate, but it has a downside where we
couldn't tell whether it was truly an incompatible admin client trying to make
connection. #2 may have wider impact than necessary, although it could
disconnect clients with unaffected RPCs unnecessarily.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]