junrao commented on code in PR #16421:
URL: https://github.com/apache/kafka/pull/16421#discussion_r1671054328


##########
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java:
##########
@@ -49,6 +50,71 @@ public class ApiVersionsResponse extends AbstractResponse {
 
     private final ApiVersionsResponseData data;
 
+    public static class Builder {
+        private Errors error = Errors.NONE;
+        private int throttleTimeMs = 0;
+        private ApiVersionCollection apiVersions = null;
+        private Features<SupportedVersionRange> supportedFeatures = null;
+        private Map<String, Short> finalizedFeatures = null;
+        private long finalizedFeaturesEpoch = 0;
+        private boolean zkMigrationEnabled = false;
+        private boolean suppressFeatureLevel0 = false;

Review Comment:
   suppressFeatureLevel0 => alterFeatureLevel0 ?



##########
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##########
@@ -102,16 +102,20 @@ class SimpleApiVersionManager(
 
   private val apiVersions = 
ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)
 
-  override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
+  override def apiVersionResponse(
+    throttleTimeMs: Int,
+    suppressFeatureLevel0: Boolean

Review Comment:
   suppressFeatureLevel0 => alterFeatureLevel0 ?



##########
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##########
@@ -33,7 +33,7 @@ trait ApiVersionManager {
   def listenerType: ListenerType
   def enabledApis: collection.Set[ApiKeys]
 
-  def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
+  def apiVersionResponse(throttleTimeMs: Int, suppressFeatureLevel0: Boolean): 
ApiVersionsResponse

Review Comment:
   suppressFeatureLevel0 => alterFeatureLevel0 ?



##########
core/src/main/scala/kafka/server/BrokerFeatures.scala:
##########
@@ -93,6 +93,9 @@ object BrokerFeatures extends Logging {
               feature.latestProduction
             }))
     }
+    if (unstableFeatureVersionsEnabled) {
+      features.put("kraft.version", new SupportedVersionRange(1))

Review Comment:
   Should we add kraft.version to PRODUCTION_FEATURES, which is supposed to be 
the place that tracks all features?
   



##########
clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java:
##########
@@ -276,6 +270,38 @@ public void testIntersect() {
         assertEquals(expected, ApiVersionsResponse.intersect(other, 
thisVersion).get());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testSuppressV0Features(boolean suppressV0Features) {

Review Comment:
   testSuppressV0Features => testAlterV0Features ?
   suppressV0Features => alterV0Features ?



##########
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java:
##########
@@ -249,35 +282,24 @@ public static ApiVersionCollection 
intersectForwardableApis(
         return apiKeys;
     }
 
-    private static ApiVersionsResponseData createApiVersionsResponseData(
-        final int throttleTimeMs,
-        final Errors error,
-        final ApiVersionCollection apiKeys,
-        final Features<SupportedVersionRange> latestSupportedFeatures,
-        final Map<String, Short> finalizedFeatures,
-        final long finalizedFeaturesEpoch,
-        final boolean zkMigrationEnabled
+    private static SupportedFeatureKeyCollection 
maybeFilterSupportedFeatureKeys(
+        Features<SupportedVersionRange> latestSupportedFeatures,
+        boolean suppressV0

Review Comment:
   suppressV0 => alterV0 ?



##########
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java:
##########
@@ -49,6 +50,71 @@ public class ApiVersionsResponse extends AbstractResponse {
 
     private final ApiVersionsResponseData data;
 
+    public static class Builder {
+        private Errors error = Errors.NONE;
+        private int throttleTimeMs = 0;
+        private ApiVersionCollection apiVersions = null;
+        private Features<SupportedVersionRange> supportedFeatures = null;
+        private Map<String, Short> finalizedFeatures = null;
+        private long finalizedFeaturesEpoch = 0;
+        private boolean zkMigrationEnabled = false;
+        private boolean suppressFeatureLevel0 = false;
+
+        public Builder setError(Errors error) {
+            this.error = error;
+            return this;
+        }
+
+        public Builder setThrottleTimeMs(int throttleTimeMs) {
+            this.throttleTimeMs = throttleTimeMs;
+            return this;
+        }
+
+        public Builder setApiVersions(ApiVersionCollection apiVersions) {
+            this.apiVersions = apiVersions;
+            return this;
+        }
+
+        public Builder setSupportedFeatures(Features<SupportedVersionRange> 
supportedFeatures) {
+            this.supportedFeatures = supportedFeatures;
+            return this;
+        }
+
+        public Builder setFinalizedFeatures(Map<String, Short> 
finalizedFeatures) {
+            this.finalizedFeatures = finalizedFeatures;
+            return this;
+        }
+
+        public Builder setFinalizedFeaturesEpoch(long finalizedFeaturesEpoch) {
+            this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+            return this;
+        }
+
+        public Builder setZkMigrationEnabled(boolean zkMigrationEnabled) {
+            this.zkMigrationEnabled = zkMigrationEnabled;
+            return this;
+        }
+
+        public Builder setSuppressFeatureLevel0(boolean suppressFeatureLevel0) 
{

Review Comment:
   suppressFeatureLevel0 => alterFeatureLevel0 ?



##########
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##########
@@ -142,27 +146,39 @@ class DefaultApiVersionManager(
 
   val enabledApis: mutable.Set[ApiKeys] = 
ApiKeys.apisForListener(listenerType).asScala
 
-  override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
-    val supportedFeatures = brokerFeatures.supportedFeatures
+  override def apiVersionResponse(
+    throttleTimeMs: Int,
+    suppressFeatureLevel0: Boolean

Review Comment:
   suppressFeatureLevel0 => alterFeatureLevel0 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to