hachikuji commented on a change in pull request #9600: URL: https://github.com/apache/kafka/pull/9600#discussion_r558469889
########## File path: clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java ########## @@ -125,13 +126,6 @@ public void testUsableVersionCalculationNoKnownVersions() { () -> versions.latestUsableVersion(ApiKeys.FETCH)); } - @Test Review comment: Hmm.. I think my suggestion about `latestUsableVersion(ApiKeys)` was off if we had to remove this. I think I had failed to take into account that `NodeApiVersions` represented the versions supported by the remote node, so the intersection was in fact necessary. Sorry about that. ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ########## @@ -190,35 +209,41 @@ class BrokerToControllerRequestThread( if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) { requestIter.remove() request.callback.onTimeout() - } else if (activeController.isDefined) { - requestIter.remove() - return Some(RequestAndCompletionHandler( - time.milliseconds(), - activeController.get, - request.request, - handleResponse(request) - )) + } else { + val controllerAddress = activeControllerAddress() + if (controllerAddress.isDefined) { + requestIter.remove() + return Some(RequestAndCompletionHandler( + time.milliseconds(), + controllerAddress.get, + request.request, + handleResponse(request) + )) + } } } None } private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = { if (response.wasDisconnected()) { - activeController = None + updateControllerAddress(null) requestQueue.putFirst(request) } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) { // just close the controller connection and wait for metadata cache update in doWork - networkClient.disconnect(activeController.get.idString) - activeController = None + activeControllerAddress().foreach(controllerAddress => { Review comment: nit: a little more idiomatic ```scala foreach { controllerAddress => } ``` ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -139,8 +138,16 @@ class KafkaApis(val requestChannel: RequestChannel, request: RequestChannel.Request, handler: RequestChannel.Request => Unit ): Unit = { - def responseCallback(response: AbstractResponse): Unit = { - sendForwardedResponse(request, response) + def responseCallback(responseEither: Either[AbstractResponse, Errors]): Unit = { + responseEither match { + case Left(response) => sendForwardedResponse(request, response) + case Right(error) => + if (error == Errors.UNSUPPORTED_VERSION) Review comment: It's a little odd to have the `if` check here since we will close the connection regardless of the error. That's one reason I thought `Option[AbstractResponse]` might be clearer. The `None` could be treated as implying an unsupported version. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ########## @@ -129,6 +131,40 @@ public static ApiVersionsResponseKeyCollection defaultApiKeys(final byte minMagi return apiKeys; } + public static ApiVersionsResponseKeyCollection intersectControllerApiVersions(final byte minMagic, Review comment: nit: maybe add a short javadoc? ########## File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java ########## @@ -227,4 +230,7 @@ public ApiVersion apiVersion(ApiKeys apiKey) { return supportedVersions.get(apiKey); } + public Map<ApiKeys, ApiVersion> fullApiVersions() { Review comment: How about `allSupportedApiVersions`? ########## File path: core/src/main/scala/kafka/server/ForwardingManager.scala ########## @@ -77,7 +79,7 @@ class ForwardingManagerImpl( override def forwardRequest( request: RequestChannel.Request, - responseCallback: AbstractResponse => Unit + responseCallback: Either[AbstractResponse, Errors] => Unit Review comment: Yeah, but that suggests a generality to the API that doesn't exist. There is only one error that is possible to be returned here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org