mumrah commented on a change in pull request #11550:
URL: https://github.com/apache/kafka/pull/11550#discussion_r758840254



##########
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##########
@@ -108,7 +108,8 @@ class KafkaRaftManager[T](
   time: Time,
   metrics: Metrics,
   threadNamePrefixOpt: Option[String],
-  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, 
AddressSpec]]
+  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, 
AddressSpec]],
+  apiVersions: ApiVersions

Review comment:
       We pass this same ApiVersions to ControllerServer (and QuroumController) 
so that we can inspect the supported features of other quorum peers.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2107,7 +2107,7 @@ class ReplicaManager(val config: KafkaConfig,
    * @param delta           The delta to apply.
    * @param newImage        The new metadata image.
    */
-  def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = {
+  def applyDelta(delta: TopicsDelta, newImage: MetadataImage, metadataVersion: 
MetadataVersionDelta): Unit = {

Review comment:
       This is here to illustrate one way we can inform broker components of a 
metadata.version change. With this approach, version changes can be handled 
in-band with other metadata changes.
   
   Another way is to register a listener with MetadataVersionManager which 
would be out-of-band from other metadata updates.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##########
@@ -239,7 +244,7 @@ public void deactivate() {
 
         List<ApiMessageAndVersion> records = new ArrayList<>();
         records.add(new ApiMessageAndVersion(record,
-            REGISTER_BROKER_RECORD.highestSupportedVersion()));
+            
metadataVersionProvider.activeVersion().recordVersion(REGISTER_BROKER_RECORD)));

Review comment:
       Rather than always picking the highest version, we now consult 
`MetadataVersion#recordVersion`. We'll need this anytime we construct a record.

##########
File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
##########
@@ -233,4 +256,8 @@ public ApiVersion apiVersion(ApiKeys apiKey) {
     public Map<ApiKeys, ApiVersion> allSupportedApiVersions() {
         return supportedVersions;
     }
+
+    public Map<String, SupportedVersionRange> supportedFeatures() {

Review comment:
       We already get a broker's supported features via ApiVersionsResponse, 
but we were not storing it in NodeApiVersions. This adds that.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
##########
@@ -45,71 +49,183 @@
     /**
      * An immutable map containing the features supported by this controller's 
software.
      */
-    private final Map<String, VersionRange> supportedFeatures;
+    private final QuorumFeatures quorumFeatures;
+
+    private final MetadataVersionProvider metadataVersionProvider;
 
     /**
      * Maps feature names to finalized version ranges.
      */
     private final TimelineHashMap<String, VersionRange> finalizedVersions;
 
-    FeatureControlManager(Map<String, VersionRange> supportedFeatures,
-                          SnapshotRegistry snapshotRegistry) {
-        this.supportedFeatures = supportedFeatures;
+    /**
+     * Collection of listeners for when features change
+     */
+    private final Map<String, FeatureLevelListener> listeners;

Review comment:
       Let other controller components register a listener for changes to 
features. 

##########
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##########
@@ -140,7 +145,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
       // Apply topic deltas.
       Option(delta.topicsDelta()).foreach { topicsDelta =>
         // Notify the replica manager about changes to topics.
-        replicaManager.applyDelta(topicsDelta, newImage)
+        replicaManager.applyDelta(topicsDelta, newImage, versionDelta)

Review comment:
       If we like this approach, we would need to pass the version delta to the 
other components that get updated here.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
##########
@@ -45,71 +49,183 @@
     /**
      * An immutable map containing the features supported by this controller's 
software.
      */
-    private final Map<String, VersionRange> supportedFeatures;
+    private final QuorumFeatures quorumFeatures;
+
+    private final MetadataVersionProvider metadataVersionProvider;
 
     /**
      * Maps feature names to finalized version ranges.
      */
     private final TimelineHashMap<String, VersionRange> finalizedVersions;
 
-    FeatureControlManager(Map<String, VersionRange> supportedFeatures,
-                          SnapshotRegistry snapshotRegistry) {
-        this.supportedFeatures = supportedFeatures;
+    /**
+     * Collection of listeners for when features change
+     */
+    private final Map<String, FeatureLevelListener> listeners;
+
+    FeatureControlManager(QuorumFeatures quorumFeatures,
+                          SnapshotRegistry snapshotRegistry,
+                          MetadataVersionProvider metadataVersionProvider) {
+        this.quorumFeatures = quorumFeatures;
+        this.metadataVersionProvider = metadataVersionProvider;
         this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.listeners = new HashMap<>();
     }
 
     ControllerResult<Map<String, ApiError>> updateFeatures(
-            Map<String, VersionRange> updates, Set<String> downgradeables,
+            Map<String, Short> updates, Set<String> downgradeables,
             Map<Integer, Map<String, VersionRange>> brokerFeatures) {
         TreeMap<String, ApiError> results = new TreeMap<>();
         List<ApiMessageAndVersion> records = new ArrayList<>();
-        for (Entry<String, VersionRange> entry : updates.entrySet()) {
+        for (Entry<String, Short> entry : updates.entrySet()) {
             results.put(entry.getKey(), updateFeature(entry.getKey(), 
entry.getValue(),
                 downgradeables.contains(entry.getKey()), brokerFeatures, 
records));
         }
 
         return ControllerResult.atomicOf(records, results);
     }
 
+    ControllerResult<Map<String, ApiError>> initializeMetadataVersion(short 
initVersion) {
+        if (finalizedVersions.containsKey(MetadataVersions.FEATURE_NAME)) {
+            return ControllerResult.atomicOf(
+                Collections.emptyList(),
+                Collections.singletonMap(
+                    MetadataVersions.FEATURE_NAME,
+                    new ApiError(Errors.INVALID_UPDATE_VERSION,
+                        "Cannot initialize metadata.version since it has 
already been initialized.")
+            ));
+        }
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        ApiError result = updateMetadataVersion(initVersion, initVersion, 
records::add);
+        return ControllerResult.atomicOf(records, 
Collections.singletonMap(MetadataVersions.FEATURE_NAME, result));
+    }
+
+    void register(String listenerName, FeatureLevelListener listener) {
+        this.listeners.putIfAbsent(listenerName, listener);
+    }
+
+    boolean canSupportVersion(String featureName, VersionRange versionRange) {
+        return quorumFeatures.localSupportedFeature(featureName)
+            .filter(localRange -> localRange.contains(versionRange))
+            .isPresent();
+    }
+
     private ApiError updateFeature(String featureName,
-                                   VersionRange newRange,
+                                   short newVersion,
                                    boolean downgradeable,
-                                   Map<Integer, Map<String, VersionRange>> 
brokerFeatures,
+                                   Map<Integer, Map<String, VersionRange>> 
brokersAndFeatures,
                                    List<ApiMessageAndVersion> records) {
-        if (newRange.min() <= 0) {
+        final VersionRange currentRange = finalizedVersions.get(featureName);
+        final VersionRange newRange;
+        if (currentRange == null) {
+            // Never seen this feature before. Initialize its min version to 
the max of supported broker mins
+            Optional<Short> minFinalizedVersion = 
brokersAndFeatures.values().stream().map(brokerFeatures -> {
+                VersionRange brokerFeatureVersion = 
brokerFeatures.get(featureName);
+                if (brokerFeatureVersion != null) {
+                    return brokerFeatureVersion.min();
+                } else {
+                    return (short) -1;
+                }
+            }).max(Short::compareTo);
+
+            if (minFinalizedVersion.isPresent()) {
+                newRange = VersionRange.of(minFinalizedVersion.get(), 
newVersion);
+            } else {
+                // No brokers know about this feature!
+                return new ApiError(Errors.INVALID_UPDATE_VERSION,
+                    "Cannot finalized this feature flag since no alive brokers 
support it.");
+            }
+        } else {
+            newRange = VersionRange.of(currentRange.min(), newVersion);
+        }
+
+        if (newRange.max() < newRange.min()) {
             return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The lower value for the new range cannot be less than 1.");
+                "The upper value for the new range cannot be less than the 
lower value.");
         }
+
         if (newRange.max() <= 0) {
             return new ApiError(Errors.INVALID_UPDATE_VERSION,
                 "The upper value for the new range cannot be less than 1.");
         }
-        VersionRange localRange = supportedFeatures.get(featureName);
-        if (localRange == null || !localRange.contains(newRange)) {
+
+        if (!canSupportVersion(featureName, newRange)) {
             return new ApiError(Errors.INVALID_UPDATE_VERSION,
                 "The controller does not support the given feature range.");
         }
-        for (Entry<Integer, Map<String, VersionRange>> brokerEntry :
-            brokerFeatures.entrySet()) {
+
+        for (Entry<Integer, Map<String, VersionRange>> brokerEntry : 
brokersAndFeatures.entrySet()) {
             VersionRange brokerRange = brokerEntry.getValue().get(featureName);
             if (brokerRange == null || !brokerRange.contains(newRange)) {
                 return new ApiError(Errors.INVALID_UPDATE_VERSION,
                     "Broker " + brokerEntry.getKey() + " does not support the 
given " +
                         "feature range.");
             }
         }
-        VersionRange currentRange = finalizedVersions.get(featureName);
+
         if (currentRange != null && currentRange.max() > newRange.max()) {
             if (!downgradeable) {
                 return new ApiError(Errors.INVALID_UPDATE_VERSION,
                     "Can't downgrade the maximum version of this feature 
without " +
                     "setting downgradable to true.");
             }
         }
-        records.add(new ApiMessageAndVersion(
-            new FeatureLevelRecord().setName(featureName).
+
+        if (featureName.equals(MetadataVersions.FEATURE_NAME)) {
+            // Perform additional checks if we're updating metadata.version
+            return updateMetadataVersion(newRange.max(), 
metadataVersionProvider.activeVersion().version(), records::add);
+        } else {
+            records.add(new ApiMessageAndVersion(
+                new FeatureLevelRecord().setName(featureName).
+                    
setMinFeatureLevel(newRange.min()).setMaxFeatureLevel(newRange.max()),
+                
metadataVersionProvider.activeVersion().recordVersion(FEATURE_LEVEL_RECORD)));
+            return ApiError.NONE;
+        }
+    }
+
+    /**
+     * Perform some additional validation for metadata.version updates.
+     */
+    private ApiError updateMetadataVersion(short newVersion,
+                                           short recordVersion,
+                                           Consumer<ApiMessageAndVersion> 
recordConsumer) {

Review comment:
       Notice here we need the desired metadata.version as well as the 
FeatureLevelRecord version to write. This is to handle the initialization case.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -784,15 +815,32 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                             curEpoch);
                     }
                     log.info(
-                        "Becoming the active controller at epoch {}, committed 
offset {} and committed epoch {}.",
-                        newEpoch, lastCommittedOffset, lastCommittedEpoch
+                        "Becoming the active controller at epoch {}, committed 
offset {}, committed epoch {}, and metadata.version {}",
+                        newEpoch, lastCommittedOffset, lastCommittedEpoch, 
activeMetadataVersion
                     );
 
                     curClaimEpoch = newEpoch;
                     controllerMetrics.setActive(true);
                     writeOffset = lastCommittedOffset;
                     clusterControl.activate();
 
+                    // Check if we need to bootstrap a metadata.version into 
the log. This must happen before we can
+                    // write any records to the log since we need the 
metadata.version to determine the correct
+                    // record version
+                    if (activeMetadataVersion == 
MetadataVersions.UNINITIALIZED.version()) {
+                        if (initialMetadataVersion == 
MetadataVersions.UNINITIALIZED) {
+                            prependWriteEvent("initializeMetadataVersion", () 
-> {
+                                log.info("Upgrading from KRaft preview. 
Initializing metadata.version to 1");
+                                return 
featureControl.initializeMetadataVersion(MetadataVersions.V1.version());

Review comment:
       Always initialize to version 1 when upgrading from existing (preview) 
KRaft cluster




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to