jsancio commented on code in PR #20551:
URL: https://github.com/apache/kafka/pull/20551#discussion_r2368926488
##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -342,18 +339,21 @@ object StorageTool extends Logging {
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
- .help("Used to initialize a controller as a single-node dynamic quorum.")
+ .help("Used to initialize a controller as a single-node dynamic quorum.
When setting this flag, " +
+ "the controller.quorum.voters config must not be set, and
controller.quorum.bootstrap.servers is set instead.")
.action(storeTrue())
reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
- .help("Used to initialize a server without a dynamic quorum topology.")
+ .help("Used to initialize a server without a dynamic quorum topology.
When setting this flag, " +
Review Comment:
Let's remove the word topology. It matches the phrase used for --standalone.
##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -342,18 +339,21 @@ object StorageTool extends Logging {
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
- .help("Used to initialize a controller as a single-node dynamic quorum.")
+ .help("Used to initialize a controller as a single-node dynamic quorum.
When setting this flag, " +
+ "the controller.quorum.voters config must not be set, and
controller.quorum.bootstrap.servers is set instead.")
.action(storeTrue())
reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
- .help("Used to initialize a server without a dynamic quorum topology.")
+ .help("Used to initialize a server without a dynamic quorum topology.
When setting this flag, " +
+ "the controller.quorum.voters config should not be set, and
controller.quorum.bootstrap.servers is set instead.")
.action(storeTrue())
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
.help("Used to initialize a server with a specific dynamic quorum
topology. The argument " +
Review Comment:
Same here. Do you mind removing the word topology?
##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -217,8 +217,8 @@ public Formatter setInitialControllers(DynamicVoters
initialControllers) {
return this;
}
- public Formatter setNoInitialControllersFlag(boolean
noInitialControllersFlag) {
- this.noInitialControllersFlag = noInitialControllersFlag;
+ public Formatter setHasDynamicQuorum(boolean staticVotersEmpty) {
+ this.hasDynamicQuorum = staticVotersEmpty;
Review Comment:
The parameter name should be something like `hasDynamicQuorum`.
##########
metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java:
##########
@@ -495,10 +499,26 @@ public void
testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Ex
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
- assertEquals("kraft.version could not be set to 1 because it
depends on " +
- "metadata.version level 21",
- assertThrows(IllegalArgumentException.class,
- formatter1.formatter::run).getMessage());
+ formatter1.formatter.setHasDynamicQuorum(true);
+ formatter1.formatter.run();
+ assertEquals((short) 1,
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean
emptyStaticVoters) throws Exception {
+ try (TestEnv testEnv = new TestEnv(2)) {
+ FormatterContext formatter1 = testEnv.newFormatter();
+
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
+ // This MV does not support kraft.version = 1
+ formatter1.formatter.setHasDynamicQuorum(emptyStaticVoters);
+ formatter1.formatter.run();
+ if (emptyStaticVoters) {
+ assertEquals((short) 1,
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
+ } else {
+ assertEquals((short) 0,
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 1));
+ }
Review Comment:
Don't you want to differentiate between missing value and the default in the
test? Why not use `featureLevels.get("kraft.version")`? This comment applies to
a few places in this file.
##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java:
##########
@@ -182,18 +181,31 @@ private KafkaConfig createNodeConfig(TestKitNode node)
throws IOException {
props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG,
brokerListenerName);
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
controllerListenerName);
- StringBuilder quorumVoterStringBuilder = new StringBuilder();
- String prefix = "";
- for (int nodeId : nodes.controllerNodes().keySet()) {
- quorumVoterStringBuilder.append(prefix).
- append(nodeId).
- append("@").
- append("localhost").
- append(":").
-
append(socketFactoryManager.getOrCreatePortForListener(nodeId,
controllerListenerName));
- prefix = ",";
+ if (!standalone && initialVoterSet.isEmpty()) {
+ StringBuilder quorumVoterStringBuilder = new StringBuilder();
+ String prefix = "";
+ for (int nodeId : nodes.controllerNodes().keySet()) {
+ quorumVoterStringBuilder.append(prefix).
+ append(nodeId).
+ append("@").
+ append("localhost").
+ append(":").
+
append(socketFactoryManager.getOrCreatePortForListener(nodeId,
controllerListenerName));
+ prefix = ",";
+ }
+ props.put(QuorumConfig.QUORUM_VOTERS_CONFIG,
quorumVoterStringBuilder.toString());
+ } else {
+ StringBuilder bootstrapServersStringBuilder = new
StringBuilder();
+ String prefix = "";
+ for (int nodeId : nodes.controllerNodes().keySet()) {
+ bootstrapServersStringBuilder.append(prefix).
+ append("localhost").
+ append(":").
+
append(socketFactoryManager.getOrCreatePortForListener(nodeId,
controllerListenerName));
+ prefix = ",";
+ }
+ props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG,
bootstrapServersStringBuilder.toString());
Review Comment:
Thanks for fixing this.
##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -348,20 +352,20 @@ private short effectiveKRaftFeatureLevel(Optional<Short>
configuredKRaftVersionL
if (configuredKRaftVersionLevel.get() == 0) {
if (hasDynamicQuorum()) {
throw new FormatterException(
- "Cannot set kraft.version to " +
- configuredKRaftVersionLevel.get() +
- " if one of the flags --standalone,
--initial-controllers, or --no-initial-controllers is used. " +
+ "Cannot set kraft.version to " +
configuredKRaftVersionLevel.get() +
+ " if controller.quorum.voters is empty and one of the
flags --standalone, " +
+ "--initial-controllers, or --no-initial-controllers is
used. " +
Review Comment:
This this true? Doesn't the tool allow --no-initial-controller when
controller.quorum.voters is specified?
##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -163,16 +164,12 @@ object StorageTool extends Logging {
if (isStandalone) {
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
}
- if (namespace.getBoolean("no_initial_controllers")) {
- formatter.setNoInitialControllersFlag(true)
- } else {
- if (config.processRoles.contains(ProcessRole.ControllerRole)) {
- if (config.quorumConfig.voters().isEmpty &&
formatter.initialVoters().isEmpty) {
+ if (!namespace.getBoolean("no_initial_controllers") &&
+ config.processRoles.contains(ProcessRole.ControllerRole) &&
+ staticVotersEmpty && formatter.initialVoters().isEmpty) {
Review Comment:
I would add a newline after `&&` and before
`formatter.initialVoters().isEmpt`.
##########
metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java:
##########
@@ -495,10 +499,26 @@ public void
testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Ex
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
- assertEquals("kraft.version could not be set to 1 because it
depends on " +
- "metadata.version level 21",
- assertThrows(IllegalArgumentException.class,
- formatter1.formatter::run).getMessage());
+ formatter1.formatter.setHasDynamicQuorum(true);
+ formatter1.formatter.run();
+ assertEquals((short) 1,
formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0));
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean
emptyStaticVoters) throws Exception {
+ try (TestEnv testEnv = new TestEnv(2)) {
+ FormatterContext formatter1 = testEnv.newFormatter();
+
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
+ // This MV does not support kraft.version = 1
+ formatter1.formatter.setHasDynamicQuorum(emptyStaticVoters);
Review Comment:
This is a bit odd. Why not name the test parameter `hasDynamicQuorum`?
##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java:
##########
@@ -477,53 +487,41 @@ private void formatNode(
} else {
formatter.setMetadataLogDirectory(Optional.empty());
}
- if
(nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
- StringBuilder dynamicVotersBuilder = new StringBuilder();
- String prefix = "";
- if (standalone) {
- if (nodeId == TestKitDefaults.CONTROLLER_ID_OFFSET) {
- final var controllerNode =
nodes.controllerNodes().get(nodeId);
- dynamicVotersBuilder.append(
- String.format(
- "%d@localhost:%d:%s",
- controllerNode.id(),
- socketFactoryManager.
-
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
- controllerNode.metadataDirectoryId()
- )
- );
-
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
- } else {
- formatter.setNoInitialControllersFlag(true);
- }
- } else if (initialVoterSet.isPresent()) {
- for (final var controllerNode :
initialVoterSet.get().entrySet()) {
- final var voterId = controllerNode.getKey();
- final var voterDirectoryId = controllerNode.getValue();
- dynamicVotersBuilder.append(prefix);
- prefix = ",";
- dynamicVotersBuilder.append(
- String.format(
- "%d@localhost:%d:%s",
- voterId,
- socketFactoryManager.
- getOrCreatePortForListener(voterId,
controllerListenerName),
- voterDirectoryId
- )
- );
- }
-
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
- } else {
- for (TestKitNode controllerNode :
nodes.controllerNodes().values()) {
- int port = socketFactoryManager.
- getOrCreatePortForListener(controllerNode.id(),
controllerListenerName);
- dynamicVotersBuilder.append(prefix);
- prefix = ",";
-
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
- controllerNode.id(), port,
controllerNode.metadataDirectoryId()));
- }
+ StringBuilder dynamicVotersBuilder = new StringBuilder();
+ String prefix = "";
+ if (standalone) {
+ if (nodeId == TestKitDefaults.CONTROLLER_ID_OFFSET) {
+ final var controllerNode =
nodes.controllerNodes().get(nodeId);
+ dynamicVotersBuilder.append(
+ String.format(
+ "%d@localhost:%d:%s",
+ controllerNode.id(),
+ socketFactoryManager.
+
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
+ controllerNode.metadataDirectoryId()
+ )
+ );
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
}
+ formatter.setHasDynamicQuorum(true);
Review Comment:
I don't under the added comment. This execute when standalone. Irrespective
of the node id. Doesn't that mean --standalone and --no-initial-controllers?
##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -348,20 +352,20 @@ private short effectiveKRaftFeatureLevel(Optional<Short>
configuredKRaftVersionL
if (configuredKRaftVersionLevel.get() == 0) {
if (hasDynamicQuorum()) {
throw new FormatterException(
- "Cannot set kraft.version to " +
- configuredKRaftVersionLevel.get() +
- " if one of the flags --standalone,
--initial-controllers, or --no-initial-controllers is used. " +
+ "Cannot set kraft.version to " +
configuredKRaftVersionLevel.get() +
+ " if controller.quorum.voters is empty and one of the
flags --standalone, " +
+ "--initial-controllers, or --no-initial-controllers is
used. " +
"For dynamic controllers support, try removing the
--feature flag for kraft.version."
);
}
} else {
if (!hasDynamicQuorum()) {
throw new FormatterException(
- "Cannot set kraft.version to " +
- configuredKRaftVersionLevel.get() +
- " unless one of the flags --standalone,
--initial-controllers, or --no-initial-controllers is used. " +
- "For dynamic controllers support, try using one of
--standalone, --initial-controllers, or " +
- "--no-initial-controllers."
+ "Cannot set kraft.version to " +
configuredKRaftVersionLevel.get() +
+ " unless controller.quorum.voters is empty and one of
the flags --standalone, " +
+ "--initial-controllers, or --no-initial-controllers
is used. " +
Review Comment:
Extra space after --no-initial-controllers
##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java:
##########
@@ -93,11 +93,6 @@ public Builder setBootstrapMetadata(BootstrapMetadata
bootstrapMetadata) {
return this;
}
- public Builder setFeature(String featureName, short level) {
- this.bootstrapMetadata =
bootstrapMetadata.copyWithFeatureRecord(featureName, level);
- return this;
- }
Review Comment:
Thanks for finally removing this configuration.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]