hachikuji commented on a change in pull request #9600:
URL: https://github.com/apache/kafka/pull/9600#discussion_r557783391



##########
File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
##########
@@ -123,21 +125,25 @@ public short latestUsableVersion(ApiKeys apiKey) {
      * Get the latest version supported by the broker within an allowed range 
of versions
      */
     public short latestUsableVersion(ApiKeys apiKey, short 
oldestAllowedVersion, short latestAllowedVersion) {
-        ApiVersion usableVersion = supportedVersions.get(apiKey);
-        if (usableVersion == null)
-            throw new UnsupportedVersionException("The broker does not support 
" + apiKey);
-        return latestUsableVersion(apiKey, usableVersion, 
oldestAllowedVersion, latestAllowedVersion);
+        return latestUsableVersion(apiKey, supportedVersions.get(apiKey), 
oldestAllowedVersion, latestAllowedVersion);
     }
 
-    private short latestUsableVersion(ApiKeys apiKey, ApiVersion 
supportedVersions,
-                                      short minAllowedVersion, short 
maxAllowedVersion) {
-        short minVersion = (short) Math.max(minAllowedVersion, 
supportedVersions.minVersion);
-        short maxVersion = (short) Math.min(maxAllowedVersion, 
supportedVersions.maxVersion);
-        if (minVersion > maxVersion)
+    private short latestUsableVersion(ApiKeys apiKey,
+                                      ApiVersion supportedVersions,
+                                      short minAllowedVersion,
+                                      short maxAllowedVersion) {
+        if (supportedVersions == null)

Review comment:
       nit: since we moved the null check here, why don't we remove the 
parameter as well and call `supportedVersions.get(apiKey)` here?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -129,6 +131,40 @@ public static ApiVersionsResponseKeyCollection 
defaultApiKeys(final byte minMagi
         return apiKeys;
     }
 
+    public static ApiVersionsResponseKeyCollection 
commonApiVersionsWithActiveController(final byte minMagic,

Review comment:
       Maybe `intersectControllerApiVersions`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
##########
@@ -123,21 +125,25 @@ public short latestUsableVersion(ApiKeys apiKey) {
      * Get the latest version supported by the broker within an allowed range 
of versions
      */
     public short latestUsableVersion(ApiKeys apiKey, short 
oldestAllowedVersion, short latestAllowedVersion) {

Review comment:
       We could probably simplify this so that it takes a single `ApiVersion` 
parameter?
   
   By the way, the implementation above `latestUsableVersion(ApiKeys apiKey)` 
since it basically does an intersection of the latest supported version with 
itself. A little helper (say `latestSupportedOrThrow`) might simplify this.

##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -140,6 +143,16 @@ class BrokerToControllerChannelManager(
       callback
     ))
   }
+
+  def controllerApiVersions(): Option[NodeApiVersions] =
+    requestThread.activeControllerAddress() match {
+      case Some(activeController) =>
+        if (activeController.id() == config.brokerId)
+          Some(currentNodeApiVersions)
+        else
+          Option(apiVersions.get(activeController.idString()))
+      case None => None

Review comment:
       Seems like we can replace the `match` with a `flatMap`?

##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -205,20 +226,20 @@ class BrokerToControllerRequestThread(
 
   private[server] def handleResponse(request: 
BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
     if (response.wasDisconnected()) {
-      activeController = None
+      updateControllerAddress(None)
       requestQueue.putFirst(request)
     } else if 
(response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
       // just close the controller connection and wait for metadata cache 
update in doWork
-      networkClient.disconnect(activeController.get.idString)
-      activeController = None
+      networkClient.disconnect(activeControllerAddress().get.idString)

Review comment:
       This is another slippery looking case. Can we just rewrite this as a 
`foreach` so that we don't need to worry about it?

##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -190,11 +211,11 @@ class BrokerToControllerRequestThread(
       if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) {
         requestIter.remove()
         request.callback.onTimeout()
-      } else if (activeController.isDefined) {
+      } else if (activeControllerAddress().isDefined) {
         requestIter.remove()
         return Some(RequestAndCompletionHandler(
           time.milliseconds(),
-          activeController.get,
+          activeControllerAddress().get,

Review comment:
       The usage is a tad suspicious because the atomic reference suggests that 
the value could change. I guess we are ok because the value will only be 
overwritten in the same thread that is calling `generateRequests`, but it might 
be worth rewriting this part anyway. For example:
   ```scala
   } else {
     val controllerAddress = activeControllerAddress()
     if (controllerAddress.isDefined) {
   ...
   ```

##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -98,30 +100,40 @@ class ForwardingManagerImpl(
         val envelopeError = envelopeResponse.error()
         val requestBody = request.body[AbstractRequest]
 
-        val response = if (envelopeError != Errors.NONE) {
-          // An envelope error indicates broker misconfiguration (e.g. the 
principal serde
-          // might not be defined on the receiving broker). In this case, we 
do not return
-          // the error directly to the client since it would not be expected. 
Instead we
-          // return `UNKNOWN_SERVER_ERROR` so that the user knows that there 
is a problem
-          // on the broker.
-          debug(s"Forwarded request $request failed with an error in the 
envelope response $envelopeError")
-          requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)
+        // Unsupported version indicates an incompatibility between controller 
and client API versions. The

Review comment:
       It may be helpful to mention that this can happen because the controller 
changed after a connection was established.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -139,8 +138,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     request: RequestChannel.Request,
     handler: RequestChannel.Request => Unit
   ): Unit = {
-    def responseCallback(response: AbstractResponse): Unit = {
-      sendForwardedResponse(request, response)
+    def responseCallback(responseEither: Either[AbstractResponse, Errors]): 
Unit = {
+      responseEither match {
+        case Left(response) => sendForwardedResponse(request, response)
+        case Right(error) => closeConnection(request, 
Collections.singletonMap(error, 1))

Review comment:
       A debug message would be helpful here so that we know why the connection 
was closed. I think we might actually prefer to use `emptyMap()` here since the 
unsupported version error would be misleading.

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -148,42 +149,49 @@ object ApiVersion {
 
   def apiVersionsResponse(throttleTimeMs: Int,
                           maxMagic: Byte,
-                          latestSupportedFeatures: 
Features[SupportedVersionRange]): ApiVersionsResponse = {
+                          latestSupportedFeatures: 
Features[SupportedVersionRange],
+                          controllerApiVersions: Option[NodeApiVersions]): 
ApiVersionsResponse = {
     apiVersionsResponse(
       throttleTimeMs,
       maxMagic,
       latestSupportedFeatures,
       Features.emptyFinalizedFeatures,
-      ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH
+      ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
+      controllerApiVersions
     )
   }
 
   def apiVersionsResponse(throttleTimeMs: Int,
                           maxMagic: Byte,
                           latestSupportedFeatures: 
Features[SupportedVersionRange],
                           finalizedFeatures: Features[FinalizedVersionRange],
-                          finalizedFeaturesEpoch: Long): ApiVersionsResponse = 
{
-    val apiKeys = ApiVersionsResponse.defaultApiKeys(maxMagic)
+                          finalizedFeaturesEpoch: Long,
+                          controllerApiVersions: Option[NodeApiVersions]): 
ApiVersionsResponse = {
+    val apiKeys = controllerApiVersions match {
+      case None => ApiVersionsResponse.defaultApiKeys(maxMagic)
+      case Some(controllerApiVersion) => 
ApiVersionsResponse.commonApiVersionsWithActiveController(
+        maxMagic, controllerApiVersion.fullApiVersions())
+    }
+
     if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE &&
       throttleTimeMs == AbstractResponse.DEFAULT_THROTTLE_TIME)
-      return new ApiVersionsResponse(
+      new ApiVersionsResponse(
         ApiVersionsResponse.createApiVersionsResponseData(
           DEFAULT_API_VERSIONS_RESPONSE.throttleTimeMs,
           Errors.forCode(DEFAULT_API_VERSIONS_RESPONSE.data.errorCode),
           apiKeys,
           latestSupportedFeatures,
           finalizedFeatures,
+          finalizedFeaturesEpoch))
+    else

Review comment:
       nit: since this is a big block, could we add the braces?

##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##########
@@ -169,11 +182,19 @@ class BrokerToControllerRequestThread(
 ) extends InterBrokerSendThread(threadName, networkClient, 
config.controllerSocketTimeoutMs, time, isInterruptible = false) {
 
   private val requestQueue = new 
LinkedBlockingDeque[BrokerToControllerQueueItem]()
-  private var activeController: Option[Node] = None
+  private val activeController = new AtomicReference[Option[Node]](None)

Review comment:
       nit: an atomic reference of Option is a little strange. Could we just 
use null and change the code to use `Option(activeController.get())`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1756,18 +1758,36 @@ class KafkaApis(val requestChannel: RequestChannel,
       else {
         val supportedFeatures = brokerFeatures.supportedFeatures
         val finalizedFeaturesOpt = finalizedFeatureCache.get
-        finalizedFeaturesOpt match {
-          case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures,
-            finalizedFeatures.features,
-            finalizedFeatures.epoch)
-          case None => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures)
+        val controllerApiVersions = if (isForwardingEnabled(request)) {
+          forwardingManager.controllerApiVersions()
+        } else
+          None
+
+        val apiVersionsResponse =
+          finalizedFeaturesOpt match {
+            case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
+              requestThrottleMs,
+              config.interBrokerProtocolVersion.recordVersion.value,
+              supportedFeatures,
+              finalizedFeatures.features,
+              finalizedFeatures.epoch,
+              controllerApiVersions)
+            case None => ApiVersion.apiVersionsResponse(
+              requestThrottleMs,
+              config.interBrokerProtocolVersion.recordVersion.value,
+              supportedFeatures,
+              controllerApiVersions)
+

Review comment:
       nit: unneeded newline

##########
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##########
@@ -77,7 +79,7 @@ class ForwardingManagerImpl(
 
   override def forwardRequest(
     request: RequestChannel.Request,
-    responseCallback: AbstractResponse => Unit
+    responseCallback: Either[AbstractResponse, Errors] => Unit

Review comment:
       Since we are not using the `Error` from the result, maybe 
`Option[AbstractResponse]` would be a better type.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1756,18 +1758,36 @@ class KafkaApis(val requestChannel: RequestChannel,
       else {
         val supportedFeatures = brokerFeatures.supportedFeatures
         val finalizedFeaturesOpt = finalizedFeatureCache.get
-        finalizedFeaturesOpt match {
-          case Some(finalizedFeatures) => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures,
-            finalizedFeatures.features,
-            finalizedFeatures.epoch)
-          case None => ApiVersion.apiVersionsResponse(
-            requestThrottleMs,
-            config.interBrokerProtocolVersion.recordVersion.value,
-            supportedFeatures)
+        val controllerApiVersions = if (isForwardingEnabled(request)) {
+          forwardingManager.controllerApiVersions()
+        } else

Review comment:
       nit: add braces

##########
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java
##########
@@ -14,10 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.clients;
+package org.apache.kafka.common.protocol;

Review comment:
       Ok. I don't have a strong argument to keep it where it is today I guess.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to