This is an automated email from the ASF dual-hosted git repository. rndgstn pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push: new 4ce3bcd767f KAFKA-15922: Bump MetadataVersion to support JBOD with KRaft (#14984) 4ce3bcd767f is described below commit 4ce3bcd767fe5ce26ef6d9464d48c6f0a1509191 Author: Proven Provenzano <93720617+pprovenz...@users.noreply.github.com> AuthorDate: Thu Dec 14 10:08:54 2023 -0500 KAFKA-15922: Bump MetadataVersion to support JBOD with KRaft (#14984) Moves ELR from MetadataVersion IBP_3_7_IV3 into the new IBP_3_8_IV0 because the ELR feature was not completed before 3.7 reached feature freeze. Leaves IBP_3_7_IV3 empty -- it is a no-op and is not reused for anything. Adds the new MetadataVersion IBP_3_7_IV4 for the FETCH request changes from KIP-951, which were mistakenly never associated with a MetadataVersion. Updates the LATEST_PRODUCTION MetadataVersion to IBP_3_7_IV4 to declare both KRaft JBOD and the KIP-951 changes ready f [...] Reviewers: Omnia G H Ibrahim <o.g.h.ibra...@gmail.com>, Ron Dagostino <rdagost...@confluent.io>, Ismael Juma <ism...@juma.me.uk>, José Armando García Sancio <jsan...@apache.org>, Justine Olshan <jols...@confluent.io> --- .../test/java/kafka/test/ClusterTestExtensionsTest.java | 2 +- core/src/test/java/kafka/test/annotation/ClusterTest.java | 2 +- .../integration/kafka/zk/ZkMigrationIntegrationTest.scala | 3 ++- .../kafka/controller/PartitionChangeBuilderTest.java | 2 +- .../org/apache/kafka/controller/QuorumControllerTest.java | 6 +++--- .../apache/kafka/metadata/PartitionRegistrationTest.java | 4 ++-- .../org/apache/kafka/server/common/MetadataVersion.java | 15 +++++++++++---- .../apache/kafka/server/common/MetadataVersionTest.java | 14 +++++++++++--- .../java/org/apache/kafka/tools/FeatureCommandTest.java | 8 ++++---- 9 files changed, 36 insertions(+), 20 deletions(-) diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java index 0857e4ded30..0a0e3e29a55 100644 --- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java +++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java @@ -117,6 +117,6 @@ public class ClusterTestExtensionsTest { @ClusterTest public void testDefaults(ClusterConfig config) { - Assertions.assertEquals(MetadataVersion.IBP_3_7_IV3, config.metadataVersion()); + Assertions.assertEquals(MetadataVersion.IBP_3_8_IV0, config.metadataVersion()); } } diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java index 1511e28a3d4..8880966dea8 100644 --- a/core/src/test/java/kafka/test/annotation/ClusterTest.java +++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java @@ -41,6 +41,6 @@ public @interface ClusterTest { String name() default ""; SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT; String listener() default ""; - MetadataVersion metadataVersion() default MetadataVersion.IBP_3_7_IV3; + MetadataVersion metadataVersion() default MetadataVersion.IBP_3_8_IV0; ClusterConfigProperty[] serverProperties() default {}; } diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index 6be7f6b422d..5bd1988201d 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -74,7 +74,8 @@ object ZkMigrationIntegrationTest { MetadataVersion.IBP_3_7_IV0, MetadataVersion.IBP_3_7_IV1, MetadataVersion.IBP_3_7_IV2, - MetadataVersion.IBP_3_7_IV3 + MetadataVersion.IBP_3_7_IV4, + MetadataVersion.IBP_3_8_IV0 ).foreach { mv => val clusterConfig = ClusterConfig.defaultClusterBuilder() .metadataVersion(mv) diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java index f81523d46b8..6c8ef5268bc 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java @@ -123,7 +123,7 @@ public class PartitionChangeBuilderTest { case (short) 1: return MetadataVersion.IBP_3_7_IV2; case (short) 2: - return MetadataVersion.IBP_3_7_IV3; + return MetadataVersion.IBP_3_8_IV0; default: throw new RuntimeException("Unknown PartitionChangeRecord version " + version); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index cac74c6a2d3..bcdb3ed7919 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -192,7 +192,7 @@ public class QuorumControllerTest { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV3)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)). setBrokerId(0). setLogDirs(Collections.singletonList(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))). setClusterId(logEnv.clusterId())).get(); @@ -236,7 +236,7 @@ public class QuorumControllerTest { ) { controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT, new BrokerRegistrationRequestData(). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV3)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)). setBrokerId(0). setLogDirs(Collections.singletonList(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))). setClusterId(logEnv.clusterId())).get(); @@ -579,7 +579,7 @@ public class QuorumControllerTest { setBrokerId(0). setClusterId(active.clusterId()). setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")). - setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_7_IV3)). + setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_8_IV0)). setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))). setListeners(listeners)); assertEquals(5L, reply.get().epoch()); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java index 3c8a402aa05..37c3cd16f80 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java @@ -283,7 +283,7 @@ public class PartitionRegistrationTest { return Arrays.asList( MetadataVersion.IBP_3_7_IV1, MetadataVersion.IBP_3_7_IV2, - MetadataVersion.IBP_3_7_IV3 + MetadataVersion.IBP_3_8_IV0 ).stream().map(mv -> Arguments.of(mv)); } @@ -369,7 +369,7 @@ public class PartitionRegistrationTest { setPartitionEpoch(0); List<UnwritableMetadataException> exceptions = new ArrayList<>(); ImageWriterOptions options = new ImageWriterOptions.Builder(). - setMetadataVersion(MetadataVersion.IBP_3_7_IV3). + setMetadataVersion(MetadataVersion.IBP_3_8_IV0). setLossHandler(exceptions::add). build(); assertEquals(new ApiMessageAndVersion(expectRecord, (short) 2), partitionRegistration.toRecord(topicID, 0, options)); diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 1359af218ef..9eecc9f345a 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -194,8 +194,15 @@ public enum MetadataVersion { // Add JBOD support for KRaft. IBP_3_7_IV2(17, "3.7", "IV2", true), + // IBP_3_7_IV3 was reserved for ELR support (KIP-966) but has been moved forward to + // a later release requiring a new MetadataVersion. MVs are not reused. + IBP_3_7_IV3(18, "3.7", "IV3", false), + + // Add new fetch request version for KIP-951 + IBP_3_7_IV4(19, "3.7", "IV4", false), + // Add ELR related supports (KIP-966). - IBP_3_7_IV3(18, "3.7", "IV3", true); + IBP_3_8_IV0(20, "3.8", "IV0", true); // NOTES when adding a new version: // Update the default version in @ClusterTest annotation to point to the latest version @@ -221,7 +228,7 @@ public enum MetadataVersion { * <strong>Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, * IT CANNOT BE CHANGED.</strong> */ - public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV0; + public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4; /** * An array containing all of the MetadataVersion entries. @@ -316,7 +323,7 @@ public enum MetadataVersion { } public boolean isElrSupported() { - return this.isAtLeast(IBP_3_7_IV3); + return this.isAtLeast(IBP_3_8_IV0); } public boolean isKRaftSupported() { @@ -393,7 +400,7 @@ public enum MetadataVersion { } public short fetchRequestVersion() { - if (this.isAtLeast(IBP_3_7_IV0)) { + if (this.isAtLeast(IBP_3_7_IV4)) { return 16; } else if (this.isAtLeast(IBP_3_5_IV1)) { return 15; diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java index 7a7ed1b3a99..6acd0b23e2d 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java @@ -165,13 +165,16 @@ class MetadataVersionTest { assertEquals(IBP_3_6_IV1, MetadataVersion.fromVersionString("3.6-IV1")); assertEquals(IBP_3_6_IV2, MetadataVersion.fromVersionString("3.6-IV2")); - // 3.7-IV0 is the latest production version in the 3.7 line - assertEquals(IBP_3_7_IV0, MetadataVersion.fromVersionString("3.7")); + // 3.7-IV4 is the latest production version in the 3.7 line + assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7")); assertEquals(IBP_3_7_IV0, MetadataVersion.fromVersionString("3.7-IV0")); assertEquals(IBP_3_7_IV1, MetadataVersion.fromVersionString("3.7-IV1")); assertEquals(IBP_3_7_IV2, MetadataVersion.fromVersionString("3.7-IV2")); assertEquals(IBP_3_7_IV3, MetadataVersion.fromVersionString("3.7-IV3")); + assertEquals(IBP_3_7_IV4, MetadataVersion.fromVersionString("3.7-IV4")); + + assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8-IV0")); } @Test @@ -228,6 +231,9 @@ class MetadataVersionTest { assertEquals("3.6", IBP_3_6_IV2.shortVersion()); assertEquals("3.7", IBP_3_7_IV0.shortVersion()); assertEquals("3.7", IBP_3_7_IV1.shortVersion()); + assertEquals("3.7", IBP_3_7_IV2.shortVersion()); + assertEquals("3.7", IBP_3_7_IV3.shortVersion()); + assertEquals("3.7", IBP_3_7_IV4.shortVersion()); } @Test @@ -275,6 +281,8 @@ class MetadataVersionTest { assertEquals("3.7-IV1", IBP_3_7_IV1.version()); assertEquals("3.7-IV2", IBP_3_7_IV2.version()); assertEquals("3.7-IV3", IBP_3_7_IV3.version()); + assertEquals("3.7-IV4", IBP_3_7_IV4.version()); + assertEquals("3.8-IV0", IBP_3_8_IV0.version()); } @Test @@ -342,7 +350,7 @@ class MetadataVersionTest { @ParameterizedTest @EnumSource(value = MetadataVersion.class) public void testIsElrSupported(MetadataVersion metadataVersion) { - assertEquals(metadataVersion.isAtLeast(IBP_3_7_IV3), metadataVersion.isElrSupported()); + assertEquals(metadataVersion.isAtLeast(IBP_3_8_IV0), metadataVersion.isElrSupported()); } @ParameterizedTest diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java index d416ba6f034..867afc3e955 100644 --- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -68,17 +68,17 @@ public class FeatureCommandTest { ); // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version) assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 3.7-IV3\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput)); + "SupportedMaxVersion: 3.8-IV0\tFinalizedVersionLevel: 3.3-IV1\t", outputWithoutEpoch(commandOutput)); } - @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV0) + @ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_7_IV4) public void testDescribeWithKRaftAndBootstrapControllers() { String commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(0, FeatureCommand.mainNoExit("--bootstrap-controller", cluster.bootstrapControllers(), "describe")) ); // Change expected message to reflect latest MetadataVersion (SupportedMaxVersion increases when adding a new version) assertEquals("Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" + - "SupportedMaxVersion: 3.7-IV3\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(commandOutput)); + "SupportedMaxVersion: 3.8-IV0\tFinalizedVersionLevel: 3.7-IV4\t", outputWithoutEpoch(commandOutput)); } @ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_3_IV1) @@ -137,7 +137,7 @@ public class FeatureCommandTest { ); // Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version) assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " + - "metadata.version. Local controller 3000 only supports versions 1-18", commandOutput); + "metadata.version. Local controller 3000 only supports versions 1-20", commandOutput); commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),