jsancio commented on code in PR #16669: URL: https://github.com/apache/kafka/pull/16669#discussion_r1700822335
########## metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java: ########## @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.storage; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.properties.MetaProperties; +import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; +import org.apache.kafka.raft.DynamicVoters; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.common.TestFeatureVersion; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.stream.Collectors; + +import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALT; +import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALTED_PASSWORD; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Timeout(value = 40) +public class FormatterTest { + private static final Logger LOG = LoggerFactory.getLogger(FormatterTest.class); + + private static final int DEFAULT_NODE_ID = 1; + + private static final Uuid DEFAULT_CLUSTER_ID = Uuid.fromString("b3dGE68sQQKzfk80C_aLZw"); + + static class TestEnv implements AutoCloseable { + final List<String> directories; + + TestEnv(int numDirs) { + this.directories = new ArrayList<>(numDirs); + for (int i = 0; i < numDirs; i++) { + this.directories.add(TestUtils.tempDirectory().getAbsolutePath()); + } + } + + FormatterContext newFormatter() { + Formatter formatter = new Formatter(). + setNodeId(DEFAULT_NODE_ID). + setClusterId(DEFAULT_CLUSTER_ID.toString()); + directories.forEach(d -> formatter.addDirectory(d)); + formatter.setMetadataLogDirectory(directories.get(0)); + return new FormatterContext(formatter); + } + + String directory(int i) { + return this.directories.get(i); + } + + void deleteDirectory(int i) throws IOException { + Utils.delete(new File(directories.get(i))); + } + + @Override + public void close() throws Exception { + for (int i = 0; i < directories.size(); i++) { + try { + deleteDirectory(i); + } catch (Exception e) { + LOG.error("Error deleting directory " + directories.get(i), e); + } + } + } + } + + static class FormatterContext { + final Formatter formatter; + final ByteArrayOutputStream stream; + + FormatterContext(Formatter formatter) { + this.formatter = formatter; + this.stream = new ByteArrayOutputStream(); + this.formatter.setPrintStream(new PrintStream(stream)); + this.formatter.setControllerListenerName("CONTROLLER"); + } + + String output() { + return stream.toString(); + } + + List<String> outputLines() { + return Arrays.asList(stream.toString().trim().split("\\r*\\n")); + } + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 3}) + public void testDirectories(int numDirs) throws Exception { + try (TestEnv testEnv = new TestEnv(numDirs)) { + testEnv.newFormatter().formatter.run(); + MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader(). + addLogDirs(testEnv.directories). + load(); + assertEquals(OptionalInt.of(DEFAULT_NODE_ID), ensemble.nodeId()); + assertEquals(Optional.of(DEFAULT_CLUSTER_ID.toString()), ensemble.clusterId()); + assertEquals(new HashSet<>(testEnv.directories), ensemble.logDirProps().keySet()); + BootstrapMetadata bootstrapMetadata = + new BootstrapDirectory(testEnv.directory(0), Optional.empty()).read(); + assertEquals(MetadataVersion.latestProduction(), bootstrapMetadata.metadataVersion()); + } + } + + @Test + public void testFormatterFailsOnAlreadyFormatted() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + testEnv.newFormatter().formatter.run(); + assertEquals("Log directory " + testEnv.directory(0) + " is already formatted. " + + "Use --ignore-formatted to ignore this directory and format the others.", + assertThrows(FormatterException.class, + () -> testEnv.newFormatter().formatter.run()).getMessage()); + } + } + + @Test + public void testFormatterFailsOnUnwritableDirectory() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + new File(testEnv.directory(0)).setReadOnly(); + FormatterContext formatter1 = testEnv.newFormatter(); + String expectedPrefix = "Error while writing meta.properties file"; + assertEquals(expectedPrefix, + assertThrows(FormatterException.class, + () -> formatter1.formatter.run()). + getMessage().substring(0, expectedPrefix.length())); + } + } + + @Test + public void testIgnoreFormatted() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.run(); + assertEquals("Formatting metadata directory " + testEnv.directory(0) + + " with metadata.version " + MetadataVersion.latestProduction() + ".", + formatter1.output().trim()); + + FormatterContext formatter2 = testEnv.newFormatter(); + formatter2.formatter.setIgnoreFormatted(true); + formatter2.formatter.run(); + assertEquals("All of the log directories are already formatted.", + formatter2.output().trim()); + } + } + + @Test + public void testOneDirectoryFormattedAndOthersNotFormatted() throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + testEnv.newFormatter().formatter.setDirectories(Arrays.asList(testEnv.directory(0))).run(); + assertEquals("Log directory " + testEnv.directory(0) + " is already formatted. " + + "Use --ignore-formatted to ignore this directory and format the others.", + assertThrows(FormatterException.class, + () -> testEnv.newFormatter().formatter.run()).getMessage()); + } + } + + @Test + public void testOneDirectoryFormattedAndOthersNotFormattedWithIgnoreFormatted() throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + testEnv.newFormatter().formatter.setDirectories(Arrays.asList(testEnv.directory(0))).run(); + + FormatterContext formatter2 = testEnv.newFormatter(); + formatter2.formatter.setIgnoreFormatted(true); + formatter2.formatter.run(); + assertEquals("Formatting data directory " + testEnv.directory(1) + " with metadata.version " + + MetadataVersion.latestProduction() + ".", + formatter2.output().trim()); + } + } + + @Test + public void testFormatWithOlderReleaseVersion() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_5_IV0); + formatter1.formatter.run(); + assertEquals("Formatting metadata directory " + testEnv.directory(0) + + " with metadata.version " + MetadataVersion.IBP_3_5_IV0 + ".", + formatter1.output().trim()); + BootstrapMetadata bootstrapMetadata = + new BootstrapDirectory(testEnv.directory(0), Optional.empty()).read(); + assertEquals(MetadataVersion.IBP_3_5_IV0, bootstrapMetadata.metadataVersion()); + assertEquals(1, bootstrapMetadata.records().size()); + } + } + + @Test + public void testFormatWithUnstableReleaseVersionFailsWithoutEnableUnstable() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(MetadataVersion.latestTesting()); + assertEquals("metadata.version " + MetadataVersion.latestTesting() + " is not yet stable.", + assertThrows(FormatterException.class, () -> formatter1.formatter.run()).getMessage()); + } + } + + @Test + public void testFormatWithUnstableReleaseVersion() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(MetadataVersion.latestTesting()); + formatter1.formatter.setUnstableFeatureVersionsEnabled(true); + formatter1.formatter.run(); + assertEquals("Formatting metadata directory " + testEnv.directory(0) + + " with metadata.version " + MetadataVersion.latestTesting() + ".", + formatter1.output().trim()); + BootstrapMetadata bootstrapMetadata = + new BootstrapDirectory(testEnv.directory(0), Optional.empty()).read(); + assertEquals(MetadataVersion.latestTesting(), bootstrapMetadata.metadataVersion()); + } + } + + @Test + public void testFormattingCreatesLogDirId() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.run(); + MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader(). + addLogDirs(testEnv.directories). + load(); + MetaProperties logDirProps = ensemble.logDirProps().get(testEnv.directory(0)); + assertNotNull(logDirProps); + assertTrue(logDirProps.directoryId().isPresent()); + } + } + + @Test + public void testFormatWithScramFailsOnUnsupportedReleaseVersions() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_4_IV0); + formatter1.formatter.setScramArguments(Arrays.asList( + "SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\"," + + "saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\"]", + "SCRAM-SHA-512=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\"," + + "saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\"]")); + assertEquals("SCRAM is only supported in metadata.version 3.5-IV2 or later.", + assertThrows(FormatterException.class, + () -> formatter1.formatter.run()).getMessage()); + } + } + + @Test + public void testFormatWithScram() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0); + formatter1.formatter.setScramArguments(Arrays.asList( + "SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\"," + + "saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\"]", + "SCRAM-SHA-512=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\"," + + "saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\"]")); + formatter1.formatter.run(); + assertEquals("Formatting metadata directory " + testEnv.directory(0) + + " with metadata.version " + MetadataVersion.IBP_3_8_IV0 + ".", + formatter1.output().trim()); + BootstrapMetadata bootstrapMetadata = + new BootstrapDirectory(testEnv.directory(0), Optional.empty()).read(); + assertEquals(MetadataVersion.IBP_3_8_IV0, bootstrapMetadata.metadataVersion()); + List<ApiMessageAndVersion> scramRecords = bootstrapMetadata.records().stream(). + filter(r -> r.message() instanceof UserScramCredentialRecord). + collect(Collectors.toList()); + ScramFormatter scram256 = new ScramFormatter(ScramMechanism.SCRAM_SHA_256); + ScramFormatter scram512 = new ScramFormatter(ScramMechanism.SCRAM_SHA_512); + assertEquals(Arrays.asList( + new ApiMessageAndVersion(new UserScramCredentialRecord(). + setName("alice"). + setMechanism(ScramMechanism.SCRAM_SHA_256.type()). + setSalt(TEST_SALT). + setStoredKey(scram256.storedKey(scram256.clientKey(TEST_SALTED_PASSWORD))). + setServerKey(scram256.serverKey(TEST_SALTED_PASSWORD)). + setIterations(4096), (short) 0), + new ApiMessageAndVersion(new UserScramCredentialRecord(). + setName("alice"). + setMechanism(ScramMechanism.SCRAM_SHA_512.type()). + setSalt(TEST_SALT). + setStoredKey(scram512.storedKey(scram512.clientKey(TEST_SALTED_PASSWORD))). + setServerKey(scram512.serverKey(TEST_SALTED_PASSWORD)). + setIterations(4096), (short) 0)), + scramRecords); + } + } + + @ParameterizedTest + @ValueSource(shorts = {0, 1}) + public void testFeatureFlag(short version) throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setSupportedFeatures(Arrays.asList(Features.values())); + formatter1.formatter.setFeatureLevel(TestFeatureVersion.FEATURE_NAME, version); + formatter1.formatter.run(); + BootstrapMetadata bootstrapMetadata = + new BootstrapDirectory(testEnv.directory(0), Optional.empty()).read(); + List<ApiMessageAndVersion> expected = new ArrayList<>(); + expected.add(new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(MetadataVersion.latestProduction().featureLevel()), + (short) 0)); + if (version > 0) { + expected.add(new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(TestFeatureVersion.FEATURE_NAME). + setFeatureLevel(version), (short) 0)); + } + assertEquals(expected, bootstrapMetadata.records()); + } + } + + @Test + public void testInvalidFeatureFlag() throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setSupportedFeatures(Arrays.asList(Features.values())); + formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1); + assertEquals("Unsupported feature: nonexistent.feature. Supported features " + + "are: kraft.version, test.feature.version, transaction.version", + assertThrows(FormatterException.class, + () -> formatter1.formatter.run()). + getMessage()); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testFormatWithInitialVoters(boolean specifyKRaftVersion) throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + if (specifyKRaftVersion) { + formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + } + formatter1.formatter.setUnstableFeatureVersionsEnabled(true); + formatter1.formatter.setInitialVoters(DynamicVoters. + parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + formatter1.formatter.run(); + assertEquals(Arrays.asList( + String.format("Formatting data directory %s with %s %s.", + testEnv.directory(1), + MetadataVersion.FEATURE_NAME, + MetadataVersion.latestTesting()), + String.format("Formatting dynamic metadata voter directory %s with %s %s.", + testEnv.directory(0), + MetadataVersion.FEATURE_NAME, + MetadataVersion.latestTesting())), + formatter1.outputLines().stream().sorted().collect(Collectors.toList())); + MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader(). + addLogDirs(testEnv.directories). + load(); + MetaProperties logDirProps0 = ensemble.logDirProps().get(testEnv.directory(0)); + assertNotNull(logDirProps0); + assertEquals(Uuid.fromString("4znU-ou9Taa06bmEJxsjnw"), logDirProps0.directoryId().get()); + MetaProperties logDirProps1 = ensemble.logDirProps().get(testEnv.directory(1)); + assertNotNull(logDirProps1); + assertNotEquals(Uuid.fromString("4znU-ou9Taa06bmEJxsjnw"), logDirProps1.directoryId().get()); + } + } + + @Test + public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setFeatureLevel("kraft.version", (short) 0); + formatter1.formatter.setUnstableFeatureVersionsEnabled(true); + formatter1.formatter.setInitialVoters(DynamicVoters. + parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + assertTrue(formatter1.formatter.hasDynamicQuorum()); + assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " + + "Try removing the --feature flag for kraft.version.", + assertThrows(FormatterException.class, + () -> formatter1.formatter.run()).getMessage()); + } + } + + @Test + public void testFormatWithoutInitialVotersFailsWithNewerKraftVersion() throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + formatter1.formatter.setUnstableFeatureVersionsEnabled(true); + assertFalse(formatter1.formatter.hasDynamicQuorum()); + assertEquals("Cannot set kraft.version to 1 unless KIP-853 configuration is present. " + + "Try removing the --feature flag for kraft.version.", + assertThrows(FormatterException.class, + () -> formatter1.formatter.run()).getMessage()); + } + } + + // -> test that KIP-853 fails on older MV Review Comment: Are you planning to implement this? ########## metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java: ########## @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.storage; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.metadata.MetadataRecordSerde; +import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.properties.MetaProperties; +import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; +import org.apache.kafka.metadata.properties.MetaPropertiesVersion; +import org.apache.kafka.raft.DynamicVoters; +import org.apache.kafka.raft.KafkaRaftClient; +import org.apache.kafka.raft.internals.VoterSet; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.FeatureVersion; +import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.snapshot.FileRawSnapshotWriter; +import org.apache.kafka.snapshot.RecordsSnapshotWriter; +import org.apache.kafka.snapshot.Snapshots; + +import java.io.File; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION; +import static org.apache.kafka.server.common.KRaftVersion.KRAFT_VERSION_0; +import static org.apache.kafka.server.common.KRaftVersion.KRAFT_VERSION_1; + +/** + * Formats storage directories. + */ +public class Formatter { + /** + * The stream to log to while formatting. + */ + private PrintStream printStream = System.out; + + /** + * The features that are supported. + */ + private List<Features> supportedFeatures = Features.PRODUCTION_FEATURES; + + /** + * The current node id. + */ + private int nodeId = -1; + + /** + * The cluster ID to use. + */ + private String clusterId = null; + + /** + * The directories to format. + */ + private final TreeSet<String> directories = new TreeSet<>(); + + /** + * The metadata version to use. + */ + private MetadataVersion releaseVersion = null; + + /** + * Maps feature names to the level they will start off with. + */ + private Map<String, Short> featureLevels = new TreeMap<>(); + + /** + * The bootstrap metadata used to format the cluster. + */ + private BootstrapMetadata bootstrapMetadata; + + /** + * True if we should enable unstable feature versions. + */ + private boolean unstableFeatureVersionsEnabled = false; + + /** + * True if we should ignore already formatted directories. + */ + private boolean ignoreFormatted = false; + + /** + * The arguments passed to --add-scram + */ + private List<String> scramArguments = Collections.emptyList(); + + /** + * The name of the initial controller listener. + */ + private String controllerListenerName = null; + + /** + * The metadata log directory. + */ + private String metadataLogDirectory = null; + + /** + * The initial KIP-853 voters. + */ + private Optional<DynamicVoters> initialControllers = Optional.empty(); + + public Formatter setPrintStream(PrintStream printStream) { + this.printStream = printStream; + return this; + } + + public Formatter setSupportedFeatures(List<Features> supportedFeatures) { + this.supportedFeatures = supportedFeatures; + return this; + } + + public Formatter setNodeId(int nodeId) { + this.nodeId = nodeId; + return this; + } + + public Formatter setClusterId(String clusterId) { + this.clusterId = clusterId; + return this; + } + + public String clusterId() { + return clusterId; + } + + public Formatter setDirectories(Collection<String> directories) { + this.directories.clear(); + this.directories.addAll(directories); + return this; + } + + public Formatter addDirectory(String directory) { + this.directories.add(directory); + return this; + } + + public Formatter setReleaseVersion(MetadataVersion releaseVersion) { + this.releaseVersion = releaseVersion; + return this; + } + + public Formatter setFeatureLevel(String featureName, Short level) { + this.featureLevels.put(featureName, level); + return this; + } + + public Formatter setUnstableFeatureVersionsEnabled(boolean unstableFeatureVersionsEnabled) { + this.unstableFeatureVersionsEnabled = unstableFeatureVersionsEnabled; + return this; + } + + public Formatter setIgnoreFormatted(boolean ignoreFormatted) { + this.ignoreFormatted = ignoreFormatted; + return this; + } + + public Formatter setScramArguments(List<String> scramArguments) { + this.scramArguments = scramArguments; + return this; + } + + public Formatter setControllerListenerName(String controllerListenerName) { + this.controllerListenerName = controllerListenerName; + return this; + } + + public Formatter setMetadataLogDirectory(String metadataLogDirectory) { + this.metadataLogDirectory = metadataLogDirectory; + return this; + } + + public Formatter setInitialVoters(DynamicVoters initialControllers) { + this.initialControllers = Optional.of(initialControllers); + return this; + } + + boolean hasDynamicQuorum() { + if (initialControllers.isPresent()) { + return true; + } + return false; + } + + public BootstrapMetadata bootstrapMetadata() { + return bootstrapMetadata; + } + + public void run() throws Exception { + if (nodeId < 0) { + throw new RuntimeException("You must specify a valid non-negative node ID."); + } + if (clusterId == null) { + throw new FormatterException("You must specify the cluster id."); + } + if (directories.isEmpty()) { + throw new FormatterException("You must specify at least one directory to format"); + } + if (controllerListenerName == null) { + throw new FormatterException("You must specify the name of the initial controller listener."); + } + if (metadataLogDirectory == null) { + throw new FormatterException("You must specify the metadata log directory."); + } + if (!directories.contains(metadataLogDirectory)) { + throw new FormatterException("The specified metadata log directory, " + metadataLogDirectory + + " was not one of the given directories: " + directories); + } + releaseVersion = calculateEffectiveReleaseVersion(); + featureLevels = calculateEffectiveFeatureLevels(); + this.bootstrapMetadata = calculateBootstrapMetadata(); + doFormat(bootstrapMetadata); + } + + /** + * Calculate the effective value of release version. This will be used to set defaults + * for the other features. We also throw an exception if something inconsistent was requested. + * + * @return The effective value of release version. + */ + MetadataVersion calculateEffectiveReleaseVersion() { + if (featureLevels.containsKey(MetadataVersion.FEATURE_NAME)) { + if (releaseVersion != null) { + throw new FormatterException("Use --release-version instead of " + + "--feature " + MetadataVersion.FEATURE_NAME + "=X to avoid ambiguity."); + } + return verifyReleaseVersion(MetadataVersion.fromFeatureLevel( + featureLevels.get(MetadataVersion.FEATURE_NAME))); + } else if (releaseVersion != null) { + return verifyReleaseVersion(releaseVersion); + } else if (unstableFeatureVersionsEnabled) { + return MetadataVersion.latestTesting(); + } else { + return MetadataVersion.latestProduction(); + } + } + + MetadataVersion verifyReleaseVersion(MetadataVersion metadataVersion) { + if (!metadataVersion.isKRaftSupported()) { + throw new FormatterException(MetadataVersion.FEATURE_NAME + " " + metadataVersion + + " is too old to be supported."); + } + if (!unstableFeatureVersionsEnabled) { + if (!metadataVersion.isProduction()) { + throw new FormatterException(MetadataVersion.FEATURE_NAME + " " + metadataVersion + + " is not yet stable."); + } + } + return metadataVersion; + } + + Map<String, Short> calculateEffectiveFeatureLevels() { + Map<String, Features> nameToSupportedFeature = new TreeMap<>(); + supportedFeatures.forEach(feature -> nameToSupportedFeature.put(feature.featureName(), feature)); + Map<String, Short> newFeatureLevels = new TreeMap<>(); + // Verify that all specified features are known to us. + for (Map.Entry<String, Short> entry : featureLevels.entrySet()) { + String featureName = entry.getKey(); + short level = entry.getValue(); + if (!featureName.equals(MetadataVersion.FEATURE_NAME)) { + if (!nameToSupportedFeature.containsKey(featureName)) { + throw new FormatterException("Unsupported feature: " + featureName + + ". Supported features are: " + nameToSupportedFeature.keySet().stream(). + collect(Collectors.joining(", "))); + } + } + newFeatureLevels.put(featureName, level); + } + newFeatureLevels.put(MetadataVersion.FEATURE_NAME, releaseVersion.featureLevel()); + // Add default values for features that were not specified. + supportedFeatures.forEach(supportedFeature -> { + if (supportedFeature.featureName().equals(KRaftVersion.FEATURE_NAME)) { + newFeatureLevels.put(KRaftVersion.FEATURE_NAME, effectiveKRaftFeatureLevel( + Optional.ofNullable(newFeatureLevels.get(KRaftVersion.FEATURE_NAME)))); + } else if (!newFeatureLevels.containsKey(supportedFeature.featureName())) { + newFeatureLevels.put(supportedFeature.featureName(), + supportedFeature.defaultValue(releaseVersion)); + } + }); + // Verify that the specified features support the given levels. This requires the full + // features map since there may be cross-feature dependencies. + for (Map.Entry<String, Short> entry : newFeatureLevels.entrySet()) { + String featureName = entry.getKey(); + if (!featureName.equals(MetadataVersion.FEATURE_NAME)) { + short level = entry.getValue(); + Features supportedFeature = nameToSupportedFeature.get(featureName); + FeatureVersion featureVersion = + supportedFeature.fromFeatureLevel(level, unstableFeatureVersionsEnabled); + Features.validateVersion(featureVersion, newFeatureLevels); + } + } + return newFeatureLevels; + } + + /** + * Calculate the effective feature level for kraft.version. In order to keep existing + * command-line invocations of StorageTool working, we default this to 0 if no dynamic + * voter quorum arguments were provided. As a convenience, if dynamic voter quorum arguments + * were passed, we set the latest kraft.version. (Currently there is only 1 non-zero version). + * + * @param configuredKRaftVersionLevel The configured level for kraft.version + * @return The effective feature level. + */ + private short effectiveKRaftFeatureLevel(Optional<Short> configuredKRaftVersionLevel) { + if (configuredKRaftVersionLevel.isPresent()) { + if (configuredKRaftVersionLevel.get() == 0) { + if (hasDynamicQuorum()) { + throw new FormatterException("Cannot set kraft.version to " + + configuredKRaftVersionLevel.get() + " if KIP-853 configuration is present. " + + "Try removing the --feature flag for kraft.version."); + } + } else { + if (!hasDynamicQuorum()) { + throw new FormatterException("Cannot set kraft.version to " + + configuredKRaftVersionLevel.get() + " unless KIP-853 configuration is present. " + + "Try removing the --feature flag for kraft.version."); + } + } + return configuredKRaftVersionLevel.get(); + } else if (hasDynamicQuorum()) { + return KRAFT_VERSION_1.featureLevel(); + } else { + return KRAFT_VERSION_0.featureLevel(); + } + } + + BootstrapMetadata calculateBootstrapMetadata() throws Exception { + BootstrapMetadata bootstrapMetadata = BootstrapMetadata. + fromVersions(releaseVersion, featureLevels, "format command"); + List<ApiMessageAndVersion> bootstrapRecords = new ArrayList<>(bootstrapMetadata.records()); + if (!scramArguments.isEmpty()) { + if (!releaseVersion.isScramSupported()) { + throw new FormatterException("SCRAM is only supported in " + MetadataVersion.FEATURE_NAME + + " " + MetadataVersion.IBP_3_5_IV2 + " or later."); + } + bootstrapRecords.addAll(ScramParser.parse(scramArguments)); + } + return BootstrapMetadata.fromRecords(bootstrapRecords, "format command"); + } + + void doFormat(BootstrapMetadata bootstrapMetadata) throws Exception { + MetaProperties metaProperties = new MetaProperties.Builder(). + setVersion(MetaPropertiesVersion.V1). + setClusterId(clusterId). + setNodeId(nodeId). + build(); + MetaPropertiesEnsemble.Loader loader = new MetaPropertiesEnsemble.Loader(); + loader.addLogDirs(directories); + MetaPropertiesEnsemble ensemble = loader.load(); + ensemble.verify(Optional.of(clusterId), + OptionalInt.of(nodeId), + EnumSet.noneOf(MetaPropertiesEnsemble.VerificationFlag.class)); + MetaPropertiesEnsemble.Copier copier = new MetaPropertiesEnsemble.Copier(ensemble); + if (!(ignoreFormatted || copier.logDirProps().isEmpty())) { + String firstLogDir = copier.logDirProps().keySet().iterator().next(); + throw new FormatterException("Log directory " + firstLogDir + " is already formatted. " + + "Use --ignore-formatted to ignore this directory and format the others."); + } + if (!copier.errorLogDirs().isEmpty()) { + copier.errorLogDirs().forEach(errorLogDir -> + printStream.println("I/O error trying to read log directory " + errorLogDir + ". Ignoring...")); + if (ensemble.emptyLogDirs().isEmpty() && copier.logDirProps().isEmpty()) { + throw new FormatterException("No available log directories to format."); + } + } + if (ensemble.emptyLogDirs().isEmpty()) { + printStream.println("All of the log directories are already formatted."); + } else { + Map<String, DirectoryType> directoryTypes = new HashMap<>(); + for (String emptyLogDir : ensemble.emptyLogDirs()) { + DirectoryType directoryType = DirectoryType.calculate(emptyLogDir, + metadataLogDirectory, + nodeId, + initialControllers); + directoryTypes.put(emptyLogDir, directoryType); + Uuid directoryId; + if (directoryType == DirectoryType.DYNAMIC_METADATA_VOTER_DIRECTORY) { + directoryId = initialControllers.get().voters().get(nodeId).directoryId(); + } else { + directoryId = copier.generateValidDirectoryId(); + } + copier.setLogDirProps(emptyLogDir, new MetaProperties.Builder(metaProperties). + setDirectoryId(directoryId). + build()); + } + copier.setPreWriteHandler((writeLogDir, __, ____) -> { + printStream.printf("Formatting %s %s with %s %s.%n", + directoryTypes.get(writeLogDir).description(), writeLogDir, + MetadataVersion.FEATURE_NAME, releaseVersion); + Files.createDirectories(Paths.get(writeLogDir)); + BootstrapDirectory bootstrapDirectory = new BootstrapDirectory(writeLogDir, Optional.empty()); + bootstrapDirectory.writeBinaryFile(bootstrapMetadata); + if (directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) { + writeDynamicQuorumSnapshot(writeLogDir, + initialControllers.get(), + featureLevels.get(KRaftVersion.FEATURE_NAME), + controllerListenerName); Review Comment: Okay. So the kraft.version = 0 if and only if there is are no `DynamicVoter`/`VoterSet` and the tool don't write a `0-0.checkpoint` file. The kraft.version = 1 if and only if there are `DynamicVoter`/`VoterSet` and the tool write a `0-0.checkpoint` file. Is that correct? ########## core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala: ########## @@ -368,11 +338,26 @@ abstract class QuorumTestHarness extends Logging { // Setting the configuration to the same value set on the brokers via TestUtils to keep KRaft based and Zk based controller configs are consistent. props.setProperty(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000") val config = new KafkaConfig(props) + + val formatter = new Formatter(). + setClusterId(Uuid.randomUuid().toString). + setNodeId(nodeId) + formatter.addDirectory(metadataDir.getAbsolutePath) + formatter.setReleaseVersion(metadataVersion) + formatter.setUnstableFeatureVersionsEnabled(true) + formatter.setControllerListenerName(config.controllerListenerNames.head) + formatter.setMetadataLogDirectory(config.metadataLogDir) + addFormatterSettings(formatter) + //formatter.setFeatureLevel(Features.TRANSACTION_VERSION.featureName, + // Features.TRANSACTION_VERSION.defaultValue(metadataVersion)) Review Comment: Uncommented code? Let's remove this if it is not needed. ########## raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.raft.internals.ReplicaKey; +import org.apache.kafka.raft.internals.VoterSet; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Objects; + +/** + * The textual representation of a KIP-853 voter. + * + * Since this is used in command-line tools, format changes to the parsing logic require a KIP, + * and should be backwards compatible. + */ +public final class DynamicVoter { + private final Uuid directoryId; + private final int nodeId; + private final String host; + private final int port; + + /** + * Create a DynamicVoter object by parsing an input string. + * + * @param input The input string. + * + * @return The DynamicVoter object. + * + * @throws IllegalArgumentException If parsing fails. + */ + public static DynamicVoter parse(String input) { + input = input.trim(); + int atIndex = input.indexOf("@"); + if (atIndex < 0) { + throw new IllegalArgumentException("No @ found in dynamic voter string."); + } + if (atIndex == 0) { + throw new IllegalArgumentException("Invalid @ at beginning of dynamic voter string."); + } + String idString = input.substring(0, atIndex); + int nodeId; + try { + nodeId = Integer.parseInt(idString); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to parse node id in dynamic voter string.", e); + } + if (nodeId < 0) { + throw new IllegalArgumentException("Invalid negative node id " + nodeId + + " in dynamic voter string."); + } + input = input.substring(atIndex + 1); + if (input.isEmpty()) { + throw new IllegalArgumentException("No hostname found after node id."); + } + String host; + if (input.startsWith("[")) { + int endBracketIndex = input.indexOf("]"); + if (endBracketIndex < 0) { + throw new IllegalArgumentException("Hostname began with left bracket, but no right " + + "bracket was found."); + } + host = input.substring(1, endBracketIndex); + input = input.substring(endBracketIndex + 1); + } else { + int endColonIndex = input.indexOf(":"); + if (endColonIndex < 0) { + throw new IllegalArgumentException("No colon following hostname could be found."); + } + host = input.substring(0, endColonIndex); + input = input.substring(endColonIndex); + } + if (!input.startsWith(":")) { + throw new IllegalArgumentException("Port section must start with a colon."); + } + input = input.substring(1); + int endColonIndex = input.indexOf(":"); + if (endColonIndex < 0) { + throw new IllegalArgumentException("No colon following port could be found."); + } + String portString = input.substring(0, endColonIndex); + int port; + try { + port = Integer.parseInt(portString); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to parse port in dynamic voter string.", e); + } + if (port < 0 || port > 65535) { + throw new IllegalArgumentException("Invalid port " + port + " in dynamic voter string."); + } + String directoryIdString = input.substring(endColonIndex + 1); + Uuid directoryId; + try { + directoryId = Uuid.fromString(directoryIdString); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Failed to parse directory ID in dynamic voter string.", e); + } + return new DynamicVoter(directoryId, nodeId, host, port); + } + + /** + * Create a new KIP-853 voter. + * + * @param directoryId The directory ID. + * @param nodeId The voter ID. + * @param host The voter hostname or IP address. + * @param port The voter port. + */ + public DynamicVoter( + Uuid directoryId, + int nodeId, + String host, + int port + ) { + this.directoryId = directoryId; + this.nodeId = nodeId; + this.host = host; + this.port = port; + } + + public Uuid directoryId() { + return directoryId; + } + + public int nodeId() { + return nodeId; + } + + public String host() { + return host; + } + + public int port() { + return port; + } + + public VoterSet.VoterNode toVoterNode(String controllerListenerName) { + ReplicaKey voterKey = ReplicaKey.of(nodeId, directoryId); + Endpoints listeners = Endpoints.fromInetSocketAddresses(Collections.singletonMap( + new ListenerName(controllerListenerName), Review Comment: It is safer to use `ListenerName.normalized`. ########## raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.raft; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.raft.internals.ReplicaKey; +import org.apache.kafka.raft.internals.VoterSet; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Objects; + +/** + * The textual representation of a KIP-853 voter. + * + * Since this is used in command-line tools, format changes to the parsing logic require a KIP, + * and should be backwards compatible. + */ +public final class DynamicVoter { + private final Uuid directoryId; + private final int nodeId; + private final String host; + private final int port; + + /** + * Create a DynamicVoter object by parsing an input string. + * + * @param input The input string. + * + * @return The DynamicVoter object. + * + * @throws IllegalArgumentException If parsing fails. + */ + public static DynamicVoter parse(String input) { + input = input.trim(); + int atIndex = input.indexOf("@"); + if (atIndex < 0) { + throw new IllegalArgumentException("No @ found in dynamic voter string."); + } + if (atIndex == 0) { + throw new IllegalArgumentException("Invalid @ at beginning of dynamic voter string."); + } + String idString = input.substring(0, atIndex); + int nodeId; + try { + nodeId = Integer.parseInt(idString); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to parse node id in dynamic voter string.", e); + } + if (nodeId < 0) { + throw new IllegalArgumentException("Invalid negative node id " + nodeId + + " in dynamic voter string."); + } + input = input.substring(atIndex + 1); + if (input.isEmpty()) { + throw new IllegalArgumentException("No hostname found after node id."); + } + String host; + if (input.startsWith("[")) { + int endBracketIndex = input.indexOf("]"); + if (endBracketIndex < 0) { + throw new IllegalArgumentException("Hostname began with left bracket, but no right " + + "bracket was found."); + } + host = input.substring(1, endBracketIndex); + input = input.substring(endBracketIndex + 1); + } else { + int endColonIndex = input.indexOf(":"); + if (endColonIndex < 0) { + throw new IllegalArgumentException("No colon following hostname could be found."); + } + host = input.substring(0, endColonIndex); + input = input.substring(endColonIndex); + } + if (!input.startsWith(":")) { + throw new IllegalArgumentException("Port section must start with a colon."); + } + input = input.substring(1); + int endColonIndex = input.indexOf(":"); + if (endColonIndex < 0) { + throw new IllegalArgumentException("No colon following port could be found."); + } + String portString = input.substring(0, endColonIndex); + int port; + try { + port = Integer.parseInt(portString); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Failed to parse port in dynamic voter string.", e); + } + if (port < 0 || port > 65535) { + throw new IllegalArgumentException("Invalid port " + port + " in dynamic voter string."); + } + String directoryIdString = input.substring(endColonIndex + 1); + Uuid directoryId; + try { + directoryId = Uuid.fromString(directoryIdString); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Failed to parse directory ID in dynamic voter string.", e); + } + return new DynamicVoter(directoryId, nodeId, host, port); + } + + /** + * Create a new KIP-853 voter. + * + * @param directoryId The directory ID. + * @param nodeId The voter ID. + * @param host The voter hostname or IP address. + * @param port The voter port. + */ + public DynamicVoter( + Uuid directoryId, + int nodeId, + String host, + int port + ) { + this.directoryId = directoryId; + this.nodeId = nodeId; + this.host = host; + this.port = port; + } + + public Uuid directoryId() { + return directoryId; + } + + public int nodeId() { + return nodeId; + } + + public String host() { + return host; + } + + public int port() { + return port; + } + + public VoterSet.VoterNode toVoterNode(String controllerListenerName) { + ReplicaKey voterKey = ReplicaKey.of(nodeId, directoryId); + Endpoints listeners = Endpoints.fromInetSocketAddresses(Collections.singletonMap( + new ListenerName(controllerListenerName), + new InetSocketAddress(host, port))); + SupportedVersionRange supportedKRaftVersion = + new SupportedVersionRange((short) 0, (short) 1); Review Comment: Okay. I am adding `SupportedVersionRange supportedVersionRange()` to `o.a.k.s.c.Features` in https://github.com/apache/kafka/pull/16735 ########## metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java: ########## @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.storage; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; +import org.apache.kafka.common.security.scram.internals.ScramFormatter; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.properties.MetaProperties; +import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; +import org.apache.kafka.raft.DynamicVoters; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.Features; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.common.TestFeatureVersion; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.stream.Collectors; + +import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALT; +import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALTED_PASSWORD; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Timeout(value = 40) +public class FormatterTest { + private static final Logger LOG = LoggerFactory.getLogger(FormatterTest.class); + + private static final int DEFAULT_NODE_ID = 1; + + private static final Uuid DEFAULT_CLUSTER_ID = Uuid.fromString("b3dGE68sQQKzfk80C_aLZw"); + + static class TestEnv implements AutoCloseable { + final List<String> directories; + + TestEnv(int numDirs) { + this.directories = new ArrayList<>(numDirs); + for (int i = 0; i < numDirs; i++) { + this.directories.add(TestUtils.tempDirectory().getAbsolutePath()); + } + } + + FormatterContext newFormatter() { + Formatter formatter = new Formatter(). + setNodeId(DEFAULT_NODE_ID). + setClusterId(DEFAULT_CLUSTER_ID.toString()); + directories.forEach(d -> formatter.addDirectory(d)); + formatter.setMetadataLogDirectory(directories.get(0)); + return new FormatterContext(formatter); + } + + String directory(int i) { + return this.directories.get(i); + } + + void deleteDirectory(int i) throws IOException { + Utils.delete(new File(directories.get(i))); + } + + @Override + public void close() throws Exception { + for (int i = 0; i < directories.size(); i++) { + try { + deleteDirectory(i); + } catch (Exception e) { + LOG.error("Error deleting directory " + directories.get(i), e); + } + } + } + } + + static class FormatterContext { + final Formatter formatter; + final ByteArrayOutputStream stream; + + FormatterContext(Formatter formatter) { + this.formatter = formatter; + this.stream = new ByteArrayOutputStream(); + this.formatter.setPrintStream(new PrintStream(stream)); + this.formatter.setControllerListenerName("CONTROLLER"); + } + + String output() { + return stream.toString(); + } + + List<String> outputLines() { + return Arrays.asList(stream.toString().trim().split("\\r*\\n")); + } + } + + @ParameterizedTest + @ValueSource(ints = {1, 2, 3}) + public void testDirectories(int numDirs) throws Exception { + try (TestEnv testEnv = new TestEnv(numDirs)) { + testEnv.newFormatter().formatter.run(); + MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader(). + addLogDirs(testEnv.directories). + load(); + assertEquals(OptionalInt.of(DEFAULT_NODE_ID), ensemble.nodeId()); + assertEquals(Optional.of(DEFAULT_CLUSTER_ID.toString()), ensemble.clusterId()); + assertEquals(new HashSet<>(testEnv.directories), ensemble.logDirProps().keySet()); + BootstrapMetadata bootstrapMetadata = + new BootstrapDirectory(testEnv.directory(0), Optional.empty()).read(); + assertEquals(MetadataVersion.latestProduction(), bootstrapMetadata.metadataVersion()); + } + } + + @Test + public void testFormatterFailsOnAlreadyFormatted() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + testEnv.newFormatter().formatter.run(); + assertEquals("Log directory " + testEnv.directory(0) + " is already formatted. " + + "Use --ignore-formatted to ignore this directory and format the others.", + assertThrows(FormatterException.class, + () -> testEnv.newFormatter().formatter.run()).getMessage()); + } + } + + @Test + public void testFormatterFailsOnUnwritableDirectory() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + new File(testEnv.directory(0)).setReadOnly(); + FormatterContext formatter1 = testEnv.newFormatter(); + String expectedPrefix = "Error while writing meta.properties file"; + assertEquals(expectedPrefix, + assertThrows(FormatterException.class, + () -> formatter1.formatter.run()). + getMessage().substring(0, expectedPrefix.length())); + } + } + + @Test + public void testIgnoreFormatted() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.run(); + assertEquals("Formatting metadata directory " + testEnv.directory(0) + + " with metadata.version " + MetadataVersion.latestProduction() + ".", + formatter1.output().trim()); + + FormatterContext formatter2 = testEnv.newFormatter(); + formatter2.formatter.setIgnoreFormatted(true); + formatter2.formatter.run(); + assertEquals("All of the log directories are already formatted.", + formatter2.output().trim()); + } + } + + @Test + public void testOneDirectoryFormattedAndOthersNotFormatted() throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + testEnv.newFormatter().formatter.setDirectories(Arrays.asList(testEnv.directory(0))).run(); + assertEquals("Log directory " + testEnv.directory(0) + " is already formatted. " + + "Use --ignore-formatted to ignore this directory and format the others.", + assertThrows(FormatterException.class, + () -> testEnv.newFormatter().formatter.run()).getMessage()); + } + } + + @Test + public void testOneDirectoryFormattedAndOthersNotFormattedWithIgnoreFormatted() throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + testEnv.newFormatter().formatter.setDirectories(Arrays.asList(testEnv.directory(0))).run(); + + FormatterContext formatter2 = testEnv.newFormatter(); + formatter2.formatter.setIgnoreFormatted(true); + formatter2.formatter.run(); + assertEquals("Formatting data directory " + testEnv.directory(1) + " with metadata.version " + + MetadataVersion.latestProduction() + ".", + formatter2.output().trim()); + } + } + + @Test + public void testFormatWithOlderReleaseVersion() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_5_IV0); + formatter1.formatter.run(); + assertEquals("Formatting metadata directory " + testEnv.directory(0) + + " with metadata.version " + MetadataVersion.IBP_3_5_IV0 + ".", + formatter1.output().trim()); + BootstrapMetadata bootstrapMetadata = + new BootstrapDirectory(testEnv.directory(0), Optional.empty()).read(); + assertEquals(MetadataVersion.IBP_3_5_IV0, bootstrapMetadata.metadataVersion()); + assertEquals(1, bootstrapMetadata.records().size()); + } + } + + @Test + public void testFormatWithUnstableReleaseVersionFailsWithoutEnableUnstable() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(MetadataVersion.latestTesting()); + assertEquals("metadata.version " + MetadataVersion.latestTesting() + " is not yet stable.", + assertThrows(FormatterException.class, () -> formatter1.formatter.run()).getMessage()); + } + } + + @Test + public void testFormatWithUnstableReleaseVersion() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(MetadataVersion.latestTesting()); + formatter1.formatter.setUnstableFeatureVersionsEnabled(true); + formatter1.formatter.run(); + assertEquals("Formatting metadata directory " + testEnv.directory(0) + + " with metadata.version " + MetadataVersion.latestTesting() + ".", + formatter1.output().trim()); + BootstrapMetadata bootstrapMetadata = + new BootstrapDirectory(testEnv.directory(0), Optional.empty()).read(); + assertEquals(MetadataVersion.latestTesting(), bootstrapMetadata.metadataVersion()); + } + } + + @Test + public void testFormattingCreatesLogDirId() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.run(); + MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader(). + addLogDirs(testEnv.directories). + load(); + MetaProperties logDirProps = ensemble.logDirProps().get(testEnv.directory(0)); + assertNotNull(logDirProps); + assertTrue(logDirProps.directoryId().isPresent()); + } + } + + @Test + public void testFormatWithScramFailsOnUnsupportedReleaseVersions() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_4_IV0); + formatter1.formatter.setScramArguments(Arrays.asList( + "SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\"," + + "saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\"]", + "SCRAM-SHA-512=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\"," + + "saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\"]")); + assertEquals("SCRAM is only supported in metadata.version 3.5-IV2 or later.", + assertThrows(FormatterException.class, + () -> formatter1.formatter.run()).getMessage()); + } + } + + @Test + public void testFormatWithScram() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0); + formatter1.formatter.setScramArguments(Arrays.asList( + "SCRAM-SHA-256=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\"," + + "saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\"]", + "SCRAM-SHA-512=[name=alice,salt=\"MWx2NHBkbnc0ZndxN25vdGN4bTB5eTFrN3E=\"," + + "saltedpassword=\"mT0yyUUxnlJaC99HXgRTSYlbuqa4FSGtJCJfTMvjYCE=\"]")); + formatter1.formatter.run(); + assertEquals("Formatting metadata directory " + testEnv.directory(0) + + " with metadata.version " + MetadataVersion.IBP_3_8_IV0 + ".", + formatter1.output().trim()); + BootstrapMetadata bootstrapMetadata = + new BootstrapDirectory(testEnv.directory(0), Optional.empty()).read(); + assertEquals(MetadataVersion.IBP_3_8_IV0, bootstrapMetadata.metadataVersion()); + List<ApiMessageAndVersion> scramRecords = bootstrapMetadata.records().stream(). + filter(r -> r.message() instanceof UserScramCredentialRecord). + collect(Collectors.toList()); + ScramFormatter scram256 = new ScramFormatter(ScramMechanism.SCRAM_SHA_256); + ScramFormatter scram512 = new ScramFormatter(ScramMechanism.SCRAM_SHA_512); + assertEquals(Arrays.asList( + new ApiMessageAndVersion(new UserScramCredentialRecord(). + setName("alice"). + setMechanism(ScramMechanism.SCRAM_SHA_256.type()). + setSalt(TEST_SALT). + setStoredKey(scram256.storedKey(scram256.clientKey(TEST_SALTED_PASSWORD))). + setServerKey(scram256.serverKey(TEST_SALTED_PASSWORD)). + setIterations(4096), (short) 0), + new ApiMessageAndVersion(new UserScramCredentialRecord(). + setName("alice"). + setMechanism(ScramMechanism.SCRAM_SHA_512.type()). + setSalt(TEST_SALT). + setStoredKey(scram512.storedKey(scram512.clientKey(TEST_SALTED_PASSWORD))). + setServerKey(scram512.serverKey(TEST_SALTED_PASSWORD)). + setIterations(4096), (short) 0)), + scramRecords); + } + } + + @ParameterizedTest + @ValueSource(shorts = {0, 1}) + public void testFeatureFlag(short version) throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setSupportedFeatures(Arrays.asList(Features.values())); + formatter1.formatter.setFeatureLevel(TestFeatureVersion.FEATURE_NAME, version); + formatter1.formatter.run(); + BootstrapMetadata bootstrapMetadata = + new BootstrapDirectory(testEnv.directory(0), Optional.empty()).read(); + List<ApiMessageAndVersion> expected = new ArrayList<>(); + expected.add(new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(MetadataVersion.latestProduction().featureLevel()), + (short) 0)); + if (version > 0) { + expected.add(new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(TestFeatureVersion.FEATURE_NAME). + setFeatureLevel(version), (short) 0)); + } + assertEquals(expected, bootstrapMetadata.records()); + } + } + + @Test + public void testInvalidFeatureFlag() throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setSupportedFeatures(Arrays.asList(Features.values())); + formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1); + assertEquals("Unsupported feature: nonexistent.feature. Supported features " + + "are: kraft.version, test.feature.version, transaction.version", + assertThrows(FormatterException.class, + () -> formatter1.formatter.run()). + getMessage()); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testFormatWithInitialVoters(boolean specifyKRaftVersion) throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + if (specifyKRaftVersion) { + formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + } + formatter1.formatter.setUnstableFeatureVersionsEnabled(true); + formatter1.formatter.setInitialVoters(DynamicVoters. + parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + formatter1.formatter.run(); + assertEquals(Arrays.asList( + String.format("Formatting data directory %s with %s %s.", + testEnv.directory(1), + MetadataVersion.FEATURE_NAME, + MetadataVersion.latestTesting()), + String.format("Formatting dynamic metadata voter directory %s with %s %s.", + testEnv.directory(0), + MetadataVersion.FEATURE_NAME, + MetadataVersion.latestTesting())), + formatter1.outputLines().stream().sorted().collect(Collectors.toList())); + MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader(). + addLogDirs(testEnv.directories). + load(); + MetaProperties logDirProps0 = ensemble.logDirProps().get(testEnv.directory(0)); + assertNotNull(logDirProps0); + assertEquals(Uuid.fromString("4znU-ou9Taa06bmEJxsjnw"), logDirProps0.directoryId().get()); + MetaProperties logDirProps1 = ensemble.logDirProps().get(testEnv.directory(1)); + assertNotNull(logDirProps1); + assertNotEquals(Uuid.fromString("4znU-ou9Taa06bmEJxsjnw"), logDirProps1.directoryId().get()); + } + } + + @Test + public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setFeatureLevel("kraft.version", (short) 0); + formatter1.formatter.setUnstableFeatureVersionsEnabled(true); + formatter1.formatter.setInitialVoters(DynamicVoters. + parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + assertTrue(formatter1.formatter.hasDynamicQuorum()); + assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " + + "Try removing the --feature flag for kraft.version.", + assertThrows(FormatterException.class, + () -> formatter1.formatter.run()).getMessage()); + } + } + + @Test + public void testFormatWithoutInitialVotersFailsWithNewerKraftVersion() throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + formatter1.formatter.setUnstableFeatureVersionsEnabled(true); + assertFalse(formatter1.formatter.hasDynamicQuorum()); + assertEquals("Cannot set kraft.version to 1 unless KIP-853 configuration is present. " + + "Try removing the --feature flag for kraft.version.", + assertThrows(FormatterException.class, + () -> formatter1.formatter.run()).getMessage()); + } + } + + // -> test that KIP-853 fails on older MV + +// val checkpointDir = tempDir + "/" + CLUSTER_METADATA_TOPIC_NAME +// assertTrue(stringAfterFirstLine(stream.toString()).contains("Snapshot written to %s".format(checkpointDir))) +// val checkpointFilePath = Snapshots.snapshotPath(Paths.get(checkpointDir), BOOTSTRAP_SNAPSHOT_ID) +// assertTrue(checkpointFilePath.toFile.exists) +// assertTrue(Utils.readFileAsString(checkpointFilePath.toFile.getPath).contains(host)) +// } finally Utils.delete(tempDir) +// } +//@Test +//def testDefaultMetadataVersion(): Unit = { +// val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ")) +// val mv = StorageTool.getMetadataVersion(namespace, Map.empty, defaultVersionString = None) +// assertEquals(MetadataVersion.LATEST_PRODUCTION.featureLevel(), mv.featureLevel(), +// "Expected the default metadata.version to be the latest production version") +// } +// +// @Test +// def testStandaloneModeWithArguments(): Unit = { +// val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yPOjhMQB6JAY", +// "-s")) +// val properties: Properties = newSelfManagedProperties() +// val logDir1 = "/tmp/other1" +// val logDir2 = "/tmp/other2" +// properties.setProperty(ServerLogConfigs.LOG_DIRS_CONFIG, logDir1 + "," + logDir2) +// properties.setProperty("listeners", "PLAINTEXT://localhost:9092") +// val config = new KafkaConfig(properties) +// try { +// val exitCode = StorageTool.runFormatCommand(namespace, config) +// val tempDirs = config.logDirs +// tempDirs.foreach(tempDir => { +// val checkpointDir = tempDir + "/" + CLUSTER_METADATA_TOPIC_NAME +// val checkpointFilePath = Snapshots.snapshotPath(Paths.get(checkpointDir), BOOTSTRAP_SNAPSHOT_ID) +// assertTrue(checkpointFilePath.toFile.exists) +// assertTrue(Utils.readFileAsString(checkpointFilePath.toFile.getPath).contains("localhost")) +// Utils.delete(new File(tempDir)) +// }) +// assertEquals(0, exitCode) +// } +// finally{ +// Utils.delete(new File(logDir1)) +// Utils.delete(new File(logDir2)) +// } +// } +// +// @Test +// def testControllerQuorumVotersWithArguments(): Unit = { +// val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS2yQQjhMQB7JAT", +// "--controller-quorum-voters", "2@localhost:9092")) +// val properties: Properties = newSelfManagedProperties() +// val logDir1 = "/tmp/other1" +// val logDir2 = "/tmp/other2" +// properties.setProperty(ServerLogConfigs.LOG_DIRS_CONFIG, logDir1 + "," + logDir2) +// properties.setProperty("listeners", "PLAINTEXT://localhost:9092") +// val config = new KafkaConfig(properties) +// try{ +// val exitCode = StorageTool.runFormatCommand(namespace, config) +// val tempDirs = config.logDirs +// tempDirs.foreach(tempDir => { +// val checkpointDir = tempDir + "/" + CLUSTER_METADATA_TOPIC_NAME +// val checkpointFilePath = Snapshots.snapshotPath(Paths.get(checkpointDir), BOOTSTRAP_SNAPSHOT_ID) +// assertTrue(checkpointFilePath.toFile.exists) +// assertTrue(Utils.readFileAsString(checkpointFilePath.toFile.getPath).contains("localhost")) +// Utils.delete(new File(tempDir)) +// }) +// assertEquals(0, exitCode) +// } +// finally { +// Utils.delete(new File(logDir1)) +// Utils.delete(new File(logDir2)) +// } +// } Review Comment: Are you planning to uncomment these tests? If not, let's remove them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
