hachikuji commented on a change in pull request #9600: URL: https://github.com/apache/kafka/pull/9600#discussion_r557783391
########## File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java ########## @@ -123,21 +125,25 @@ public short latestUsableVersion(ApiKeys apiKey) { * Get the latest version supported by the broker within an allowed range of versions */ public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) { - ApiVersion usableVersion = supportedVersions.get(apiKey); - if (usableVersion == null) - throw new UnsupportedVersionException("The broker does not support " + apiKey); - return latestUsableVersion(apiKey, usableVersion, oldestAllowedVersion, latestAllowedVersion); + return latestUsableVersion(apiKey, supportedVersions.get(apiKey), oldestAllowedVersion, latestAllowedVersion); } - private short latestUsableVersion(ApiKeys apiKey, ApiVersion supportedVersions, - short minAllowedVersion, short maxAllowedVersion) { - short minVersion = (short) Math.max(minAllowedVersion, supportedVersions.minVersion); - short maxVersion = (short) Math.min(maxAllowedVersion, supportedVersions.maxVersion); - if (minVersion > maxVersion) + private short latestUsableVersion(ApiKeys apiKey, + ApiVersion supportedVersions, + short minAllowedVersion, + short maxAllowedVersion) { + if (supportedVersions == null) Review comment: nit: since we moved the null check here, why don't we remove the parameter as well and call `supportedVersions.get(apiKey)` here? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ########## @@ -129,6 +131,40 @@ public static ApiVersionsResponseKeyCollection defaultApiKeys(final byte minMagi return apiKeys; } + public static ApiVersionsResponseKeyCollection commonApiVersionsWithActiveController(final byte minMagic, Review comment: Maybe `intersectControllerApiVersions`? ########## File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java ########## @@ -123,21 +125,25 @@ public short latestUsableVersion(ApiKeys apiKey) { * Get the latest version supported by the broker within an allowed range of versions */ public short latestUsableVersion(ApiKeys apiKey, short oldestAllowedVersion, short latestAllowedVersion) { Review comment: We could probably simplify this so that it takes a single `ApiVersion` parameter? By the way, the implementation above `latestUsableVersion(ApiKeys apiKey)` since it basically does an intersection of the latest supported version with itself. A little helper (say `latestSupportedOrThrow`) might simplify this. ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ########## @@ -140,6 +143,16 @@ class BrokerToControllerChannelManager( callback )) } + + def controllerApiVersions(): Option[NodeApiVersions] = + requestThread.activeControllerAddress() match { + case Some(activeController) => + if (activeController.id() == config.brokerId) + Some(currentNodeApiVersions) + else + Option(apiVersions.get(activeController.idString())) + case None => None Review comment: Seems like we can replace the `match` with a `flatMap`? ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ########## @@ -205,20 +226,20 @@ class BrokerToControllerRequestThread( private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = { if (response.wasDisconnected()) { - activeController = None + updateControllerAddress(None) requestQueue.putFirst(request) } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) { // just close the controller connection and wait for metadata cache update in doWork - networkClient.disconnect(activeController.get.idString) - activeController = None + networkClient.disconnect(activeControllerAddress().get.idString) Review comment: This is another slippery looking case. Can we just rewrite this as a `foreach` so that we don't need to worry about it? ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ########## @@ -190,11 +211,11 @@ class BrokerToControllerRequestThread( if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) { requestIter.remove() request.callback.onTimeout() - } else if (activeController.isDefined) { + } else if (activeControllerAddress().isDefined) { requestIter.remove() return Some(RequestAndCompletionHandler( time.milliseconds(), - activeController.get, + activeControllerAddress().get, Review comment: The usage is a tad suspicious because the atomic reference suggests that the value could change. I guess we are ok because the value will only be overwritten in the same thread that is calling `generateRequests`, but it might be worth rewriting this part anyway. For example: ```scala } else { val controllerAddress = activeControllerAddress() if (controllerAddress.isDefined) { ... ``` ########## File path: core/src/main/scala/kafka/server/ForwardingManager.scala ########## @@ -98,30 +100,40 @@ class ForwardingManagerImpl( val envelopeError = envelopeResponse.error() val requestBody = request.body[AbstractRequest] - val response = if (envelopeError != Errors.NONE) { - // An envelope error indicates broker misconfiguration (e.g. the principal serde - // might not be defined on the receiving broker). In this case, we do not return - // the error directly to the client since it would not be expected. Instead we - // return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem - // on the broker. - debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError") - requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception) + // Unsupported version indicates an incompatibility between controller and client API versions. The Review comment: It may be helpful to mention that this can happen because the controller changed after a connection was established. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -139,8 +138,11 @@ class KafkaApis(val requestChannel: RequestChannel, request: RequestChannel.Request, handler: RequestChannel.Request => Unit ): Unit = { - def responseCallback(response: AbstractResponse): Unit = { - sendForwardedResponse(request, response) + def responseCallback(responseEither: Either[AbstractResponse, Errors]): Unit = { + responseEither match { + case Left(response) => sendForwardedResponse(request, response) + case Right(error) => closeConnection(request, Collections.singletonMap(error, 1)) Review comment: A debug message would be helpful here so that we know why the connection was closed. I think we might actually prefer to use `emptyMap()` here since the unsupported version error would be misleading. ########## File path: core/src/main/scala/kafka/api/ApiVersion.scala ########## @@ -148,42 +149,49 @@ object ApiVersion { def apiVersionsResponse(throttleTimeMs: Int, maxMagic: Byte, - latestSupportedFeatures: Features[SupportedVersionRange]): ApiVersionsResponse = { + latestSupportedFeatures: Features[SupportedVersionRange], + controllerApiVersions: Option[NodeApiVersions]): ApiVersionsResponse = { apiVersionsResponse( throttleTimeMs, maxMagic, latestSupportedFeatures, Features.emptyFinalizedFeatures, - ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH + ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, + controllerApiVersions ) } def apiVersionsResponse(throttleTimeMs: Int, maxMagic: Byte, latestSupportedFeatures: Features[SupportedVersionRange], finalizedFeatures: Features[FinalizedVersionRange], - finalizedFeaturesEpoch: Long): ApiVersionsResponse = { - val apiKeys = ApiVersionsResponse.defaultApiKeys(maxMagic) + finalizedFeaturesEpoch: Long, + controllerApiVersions: Option[NodeApiVersions]): ApiVersionsResponse = { + val apiKeys = controllerApiVersions match { + case None => ApiVersionsResponse.defaultApiKeys(maxMagic) + case Some(controllerApiVersion) => ApiVersionsResponse.commonApiVersionsWithActiveController( + maxMagic, controllerApiVersion.fullApiVersions()) + } + if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == AbstractResponse.DEFAULT_THROTTLE_TIME) - return new ApiVersionsResponse( + new ApiVersionsResponse( ApiVersionsResponse.createApiVersionsResponseData( DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs, Errors.forCode(DEFAULT_API_VERSIONS_RESPONSE.data.errorCode), apiKeys, latestSupportedFeatures, finalizedFeatures, + finalizedFeaturesEpoch)) + else Review comment: nit: since this is a big block, could we add the braces? ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ########## @@ -169,11 +182,19 @@ class BrokerToControllerRequestThread( ) extends InterBrokerSendThread(threadName, networkClient, config.controllerSocketTimeoutMs, time, isInterruptible = false) { private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]() - private var activeController: Option[Node] = None + private val activeController = new AtomicReference[Option[Node]](None) Review comment: nit: an atomic reference of Option is a little strange. Could we just use null and change the code to use `Option(activeController.get())`? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1756,18 +1758,36 @@ class KafkaApis(val requestChannel: RequestChannel, else { val supportedFeatures = brokerFeatures.supportedFeatures val finalizedFeaturesOpt = finalizedFeatureCache.get - finalizedFeaturesOpt match { - case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse( - requestThrottleMs, - config.interBrokerProtocolVersion.recordVersion.value, - supportedFeatures, - finalizedFeatures.features, - finalizedFeatures.epoch) - case None => ApiVersion.apiVersionsResponse( - requestThrottleMs, - config.interBrokerProtocolVersion.recordVersion.value, - supportedFeatures) + val controllerApiVersions = if (isForwardingEnabled(request)) { + forwardingManager.controllerApiVersions() + } else + None + + val apiVersionsResponse = + finalizedFeaturesOpt match { + case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse( + requestThrottleMs, + config.interBrokerProtocolVersion.recordVersion.value, + supportedFeatures, + finalizedFeatures.features, + finalizedFeatures.epoch, + controllerApiVersions) + case None => ApiVersion.apiVersionsResponse( + requestThrottleMs, + config.interBrokerProtocolVersion.recordVersion.value, + supportedFeatures, + controllerApiVersions) + Review comment: nit: unneeded newline ########## File path: core/src/main/scala/kafka/server/ForwardingManager.scala ########## @@ -77,7 +79,7 @@ class ForwardingManagerImpl( override def forwardRequest( request: RequestChannel.Request, - responseCallback: AbstractResponse => Unit + responseCallback: Either[AbstractResponse, Errors] => Unit Review comment: Since we are not using the `Error` from the result, maybe `Option[AbstractResponse]` would be a better type. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1756,18 +1758,36 @@ class KafkaApis(val requestChannel: RequestChannel, else { val supportedFeatures = brokerFeatures.supportedFeatures val finalizedFeaturesOpt = finalizedFeatureCache.get - finalizedFeaturesOpt match { - case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse( - requestThrottleMs, - config.interBrokerProtocolVersion.recordVersion.value, - supportedFeatures, - finalizedFeatures.features, - finalizedFeatures.epoch) - case None => ApiVersion.apiVersionsResponse( - requestThrottleMs, - config.interBrokerProtocolVersion.recordVersion.value, - supportedFeatures) + val controllerApiVersions = if (isForwardingEnabled(request)) { + forwardingManager.controllerApiVersions() + } else Review comment: nit: add braces ########## File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java ########## @@ -14,10 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.clients; +package org.apache.kafka.common.protocol; Review comment: Ok. I don't have a strong argument to keep it where it is today I guess. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org