mumrah commented on a change in pull request #11550: URL: https://github.com/apache/kafka/pull/11550#discussion_r758840254
########## File path: core/src/main/scala/kafka/raft/RaftManager.scala ########## @@ -108,7 +108,8 @@ class KafkaRaftManager[T]( time: Time, metrics: Metrics, threadNamePrefixOpt: Option[String], - val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]] + val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]], + apiVersions: ApiVersions Review comment: We pass this same ApiVersions to ControllerServer (and QuroumController) so that we can inspect the supported features of other quorum peers. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -2107,7 +2107,7 @@ class ReplicaManager(val config: KafkaConfig, * @param delta The delta to apply. * @param newImage The new metadata image. */ - def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = { + def applyDelta(delta: TopicsDelta, newImage: MetadataImage, metadataVersion: MetadataVersionDelta): Unit = { Review comment: This is here to illustrate one way we can inform broker components of a metadata.version change. With this approach, version changes can be handled in-band with other metadata changes. Another way is to register a listener with MetadataVersionManager which would be out-of-band from other metadata updates. ########## File path: metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java ########## @@ -239,7 +244,7 @@ public void deactivate() { List<ApiMessageAndVersion> records = new ArrayList<>(); records.add(new ApiMessageAndVersion(record, - REGISTER_BROKER_RECORD.highestSupportedVersion())); + metadataVersionProvider.activeVersion().recordVersion(REGISTER_BROKER_RECORD))); Review comment: Rather than always picking the highest version, we now consult `MetadataVersion#recordVersion`. We'll need this anytime we construct a record. ########## File path: clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java ########## @@ -233,4 +256,8 @@ public ApiVersion apiVersion(ApiKeys apiKey) { public Map<ApiKeys, ApiVersion> allSupportedApiVersions() { return supportedVersions; } + + public Map<String, SupportedVersionRange> supportedFeatures() { Review comment: We already get a broker's supported features via ApiVersionsResponse, but we were not storing it in NodeApiVersions. This adds that. ########## File path: metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java ########## @@ -45,71 +49,183 @@ /** * An immutable map containing the features supported by this controller's software. */ - private final Map<String, VersionRange> supportedFeatures; + private final QuorumFeatures quorumFeatures; + + private final MetadataVersionProvider metadataVersionProvider; /** * Maps feature names to finalized version ranges. */ private final TimelineHashMap<String, VersionRange> finalizedVersions; - FeatureControlManager(Map<String, VersionRange> supportedFeatures, - SnapshotRegistry snapshotRegistry) { - this.supportedFeatures = supportedFeatures; + /** + * Collection of listeners for when features change + */ + private final Map<String, FeatureLevelListener> listeners; Review comment: Let other controller components register a listener for changes to features. ########## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala ########## @@ -140,7 +145,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig, // Apply topic deltas. Option(delta.topicsDelta()).foreach { topicsDelta => // Notify the replica manager about changes to topics. - replicaManager.applyDelta(topicsDelta, newImage) + replicaManager.applyDelta(topicsDelta, newImage, versionDelta) Review comment: If we like this approach, we would need to pass the version delta to the other components that get updated here. ########## File path: metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java ########## @@ -45,71 +49,183 @@ /** * An immutable map containing the features supported by this controller's software. */ - private final Map<String, VersionRange> supportedFeatures; + private final QuorumFeatures quorumFeatures; + + private final MetadataVersionProvider metadataVersionProvider; /** * Maps feature names to finalized version ranges. */ private final TimelineHashMap<String, VersionRange> finalizedVersions; - FeatureControlManager(Map<String, VersionRange> supportedFeatures, - SnapshotRegistry snapshotRegistry) { - this.supportedFeatures = supportedFeatures; + /** + * Collection of listeners for when features change + */ + private final Map<String, FeatureLevelListener> listeners; + + FeatureControlManager(QuorumFeatures quorumFeatures, + SnapshotRegistry snapshotRegistry, + MetadataVersionProvider metadataVersionProvider) { + this.quorumFeatures = quorumFeatures; + this.metadataVersionProvider = metadataVersionProvider; this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0); + this.listeners = new HashMap<>(); } ControllerResult<Map<String, ApiError>> updateFeatures( - Map<String, VersionRange> updates, Set<String> downgradeables, + Map<String, Short> updates, Set<String> downgradeables, Map<Integer, Map<String, VersionRange>> brokerFeatures) { TreeMap<String, ApiError> results = new TreeMap<>(); List<ApiMessageAndVersion> records = new ArrayList<>(); - for (Entry<String, VersionRange> entry : updates.entrySet()) { + for (Entry<String, Short> entry : updates.entrySet()) { results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(), downgradeables.contains(entry.getKey()), brokerFeatures, records)); } return ControllerResult.atomicOf(records, results); } + ControllerResult<Map<String, ApiError>> initializeMetadataVersion(short initVersion) { + if (finalizedVersions.containsKey(MetadataVersions.FEATURE_NAME)) { + return ControllerResult.atomicOf( + Collections.emptyList(), + Collections.singletonMap( + MetadataVersions.FEATURE_NAME, + new ApiError(Errors.INVALID_UPDATE_VERSION, + "Cannot initialize metadata.version since it has already been initialized.") + )); + } + List<ApiMessageAndVersion> records = new ArrayList<>(); + ApiError result = updateMetadataVersion(initVersion, initVersion, records::add); + return ControllerResult.atomicOf(records, Collections.singletonMap(MetadataVersions.FEATURE_NAME, result)); + } + + void register(String listenerName, FeatureLevelListener listener) { + this.listeners.putIfAbsent(listenerName, listener); + } + + boolean canSupportVersion(String featureName, VersionRange versionRange) { + return quorumFeatures.localSupportedFeature(featureName) + .filter(localRange -> localRange.contains(versionRange)) + .isPresent(); + } + private ApiError updateFeature(String featureName, - VersionRange newRange, + short newVersion, boolean downgradeable, - Map<Integer, Map<String, VersionRange>> brokerFeatures, + Map<Integer, Map<String, VersionRange>> brokersAndFeatures, List<ApiMessageAndVersion> records) { - if (newRange.min() <= 0) { + final VersionRange currentRange = finalizedVersions.get(featureName); + final VersionRange newRange; + if (currentRange == null) { + // Never seen this feature before. Initialize its min version to the max of supported broker mins + Optional<Short> minFinalizedVersion = brokersAndFeatures.values().stream().map(brokerFeatures -> { + VersionRange brokerFeatureVersion = brokerFeatures.get(featureName); + if (brokerFeatureVersion != null) { + return brokerFeatureVersion.min(); + } else { + return (short) -1; + } + }).max(Short::compareTo); + + if (minFinalizedVersion.isPresent()) { + newRange = VersionRange.of(minFinalizedVersion.get(), newVersion); + } else { + // No brokers know about this feature! + return new ApiError(Errors.INVALID_UPDATE_VERSION, + "Cannot finalized this feature flag since no alive brokers support it."); + } + } else { + newRange = VersionRange.of(currentRange.min(), newVersion); + } + + if (newRange.max() < newRange.min()) { return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The lower value for the new range cannot be less than 1."); + "The upper value for the new range cannot be less than the lower value."); } + if (newRange.max() <= 0) { return new ApiError(Errors.INVALID_UPDATE_VERSION, "The upper value for the new range cannot be less than 1."); } - VersionRange localRange = supportedFeatures.get(featureName); - if (localRange == null || !localRange.contains(newRange)) { + + if (!canSupportVersion(featureName, newRange)) { return new ApiError(Errors.INVALID_UPDATE_VERSION, "The controller does not support the given feature range."); } - for (Entry<Integer, Map<String, VersionRange>> brokerEntry : - brokerFeatures.entrySet()) { + + for (Entry<Integer, Map<String, VersionRange>> brokerEntry : brokersAndFeatures.entrySet()) { VersionRange brokerRange = brokerEntry.getValue().get(featureName); if (brokerRange == null || !brokerRange.contains(newRange)) { return new ApiError(Errors.INVALID_UPDATE_VERSION, "Broker " + brokerEntry.getKey() + " does not support the given " + "feature range."); } } - VersionRange currentRange = finalizedVersions.get(featureName); + if (currentRange != null && currentRange.max() > newRange.max()) { if (!downgradeable) { return new ApiError(Errors.INVALID_UPDATE_VERSION, "Can't downgrade the maximum version of this feature without " + "setting downgradable to true."); } } - records.add(new ApiMessageAndVersion( - new FeatureLevelRecord().setName(featureName). + + if (featureName.equals(MetadataVersions.FEATURE_NAME)) { + // Perform additional checks if we're updating metadata.version + return updateMetadataVersion(newRange.max(), metadataVersionProvider.activeVersion().version(), records::add); + } else { + records.add(new ApiMessageAndVersion( + new FeatureLevelRecord().setName(featureName). + setMinFeatureLevel(newRange.min()).setMaxFeatureLevel(newRange.max()), + metadataVersionProvider.activeVersion().recordVersion(FEATURE_LEVEL_RECORD))); + return ApiError.NONE; + } + } + + /** + * Perform some additional validation for metadata.version updates. + */ + private ApiError updateMetadataVersion(short newVersion, + short recordVersion, + Consumer<ApiMessageAndVersion> recordConsumer) { Review comment: Notice here we need the desired metadata.version as well as the FeatureLevelRecord version to write. This is to handle the initialization case. ########## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ########## @@ -784,15 +815,32 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) { curEpoch); } log.info( - "Becoming the active controller at epoch {}, committed offset {} and committed epoch {}.", - newEpoch, lastCommittedOffset, lastCommittedEpoch + "Becoming the active controller at epoch {}, committed offset {}, committed epoch {}, and metadata.version {}", + newEpoch, lastCommittedOffset, lastCommittedEpoch, activeMetadataVersion ); curClaimEpoch = newEpoch; controllerMetrics.setActive(true); writeOffset = lastCommittedOffset; clusterControl.activate(); + // Check if we need to bootstrap a metadata.version into the log. This must happen before we can + // write any records to the log since we need the metadata.version to determine the correct + // record version + if (activeMetadataVersion == MetadataVersions.UNINITIALIZED.version()) { + if (initialMetadataVersion == MetadataVersions.UNINITIALIZED) { + prependWriteEvent("initializeMetadataVersion", () -> { + log.info("Upgrading from KRaft preview. Initializing metadata.version to 1"); + return featureControl.initializeMetadataVersion(MetadataVersions.V1.version()); Review comment: Always initialize to version 1 when upgrading from existing (preview) KRaft cluster -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org