hachikuji commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r558469889



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
##########
@@ -125,13 +126,6 @@ public void testUsableVersionCalculationNoKnownVersions() {
             () -> versions.latestUsableVersion(ApiKeys.FETCH));
     }
 
-    @Test

Review comment:
       Hmm.. I think my suggestion about `latestUsableVersion(ApiKeys)` was off 
if we had to remove this. I think I had failed to take into account that 
`NodeApiVersions` represented the versions supported by the remote node, so the 
intersection was in fact necessary. Sorry about that.

##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -190,35 +209,41 @@ class BrokerToControllerRequestThread(
       if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) {
         requestIter.remove()
         request.callback.onTimeout()
-      } else if (activeController.isDefined) {
-        requestIter.remove()
-        return Some(RequestAndCompletionHandler(
-          time.milliseconds(),
-          activeController.get,
-          request.request,
-          handleResponse(request)
-        ))
+      } else {
+        val controllerAddress = activeControllerAddress()
+        if (controllerAddress.isDefined) {
+          requestIter.remove()
+          return Some(RequestAndCompletionHandler(
+            time.milliseconds(),
+            controllerAddress.get,
+            request.request,
+            handleResponse(request)
+          ))
+        }
       }
     }
     None
   }
 
   private[server] def handleResponse(request: 
BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
     if (response.wasDisconnected()) {
-      activeController = None
+      updateControllerAddress(null)
       requestQueue.putFirst(request)
     } else if 
(response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
       // just close the controller connection and wait for metadata cache 
update in doWork
-      networkClient.disconnect(activeController.get.idString)
-      activeController = None
+      activeControllerAddress().foreach(controllerAddress => {

Review comment:
       nit: a little more idiomatic
   ```scala
   foreach { controllerAddress =>
   }
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -139,8 +138,16 @@ class KafkaApis(val requestChannel: RequestChannel,
     request: RequestChannel.Request,
     handler: RequestChannel.Request => Unit
   ): Unit = {
-    def responseCallback(response: AbstractResponse): Unit = {
-      sendForwardedResponse(request, response)
+    def responseCallback(responseEither: Either[AbstractResponse, Errors]): 
Unit = {
+      responseEither match {
+        case Left(response) => sendForwardedResponse(request, response)
+        case Right(error) =>
+          if (error == Errors.UNSUPPORTED_VERSION)

Review comment:
       It's a little odd to have the `if` check here since we will close the 
connection regardless of the error. That's one reason I thought 
`Option[AbstractResponse]` might be clearer. The `None` could be treated as 
implying an unsupported version.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -129,6 +131,40 @@ public static ApiVersionsResponseKeyCollection 
defaultApiKeys(final byte minMagi
         return apiKeys;
     }
 
+    public static ApiVersionsResponseKeyCollection 
intersectControllerApiVersions(final byte minMagic,

Review comment:
       nit: maybe add a short javadoc?

##########
File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
##########
@@ -227,4 +230,7 @@ public ApiVersion apiVersion(ApiKeys apiKey) {
         return supportedVersions.get(apiKey);
     }
 
+    public Map<ApiKeys, ApiVersion> fullApiVersions() {

Review comment:
       How about `allSupportedApiVersions`?

##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -77,7 +79,7 @@ class ForwardingManagerImpl(
 
   override def forwardRequest(
     request: RequestChannel.Request,
-    responseCallback: AbstractResponse => Unit
+    responseCallback: Either[AbstractResponse, Errors] => Unit

Review comment:
       Yeah, but that suggests a generality to the API that doesn't exist. 
There is only one error that is possible to be returned here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to