Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
cmccabe merged PR #15648: URL: https://github.com/apache/kafka/pull/15648 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on PR #15648: URL: https://github.com/apache/kafka/pull/15648#issuecomment-2050028734 Thanks for taking a look @chia7712. I've updated the PR with your suggestions and added a few more test cases. cc @soarez and @jsancio in case you'd like to take another pass -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
chia7712 commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549840889 ## core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala: ## @@ -177,6 +201,78 @@ class RaftManagerTest { assertFalse(fileLocked(lockPath)) } + @Test + def testMigratingZkBrokerDeletesMetadataLog(): Unit = { +val logDir = Some(TestUtils.tempDir().toPath) +val metadataDir = Some(TestUtils.tempDir().toPath) +val nodeId = 1 +val config = createZkBrokerConfig(migrationEnabled = true, nodeId, logDir, metadataDir) +val raftManager = createRaftManager( + new TopicPartition("__cluster_metadata", 0), + config +) +raftManager.shutdown() + +try { + KafkaRaftManager.maybeDeleteMetadataLogDir(config) + assertFalse(Files.exists(metadataDir.get.resolve("__cluster_metadata-0"))) +} catch { + case err: Throwable => fail("Failed to delete metadata log", err) +} +assertTrue(Files.exists(metadataDir.get)) + } + + @Test + def testNonMigratingZkBrokerDeletesMetadataLog(): Unit = { +val logDir = Some(TestUtils.tempDir().toPath) +val metadataDir = Some(TestUtils.tempDir().toPath) +val nodeId = 1 +// Use this config to create the directory +val config1 = createZkBrokerConfig(migrationEnabled = true, nodeId, logDir, metadataDir) +val raftManager = createRaftManager( + new TopicPartition("__cluster_metadata", 0), + config1 +) +raftManager.shutdown() + +val config2 = createZkBrokerConfig(migrationEnabled = false, nodeId, logDir, metadataDir) +try { + KafkaRaftManager.maybeDeleteMetadataLogDir(config2) + fail("Should have not deleted the metadata log") +} catch { + case err: Throwable => +assertEquals("Not deleting metadata log dir since migrations are not enabled.", err.getMessage) + assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0"))) +} +assertTrue(Files.exists(metadataDir.get)) + } + + @Test + def testKRaftBrokerDoesNotDeleteMetadataLog(): Unit = { +val logDir = Some(TestUtils.tempDir().toPath) +val metadataDir = Some(TestUtils.tempDir().toPath) +val nodeId = 1 +val config = createConfig( + Set(ProcessRole.BrokerRole), + nodeId, + logDir, + metadataDir +) +val raftManager = createRaftManager( + new TopicPartition("__cluster_metadata", 0), + config +) +raftManager.shutdown() + +try { Review Comment: we can use `assertThrow` to simplify the code. for example: ```scala assertThrows(classOf[RuntimeException], () => KafkaRaftManager.maybeDeleteMetadataLogDir(config)) assertTrue(Files.exists(metadataDir.get.resolve("__cluster_metadata-0"))) assertTrue(Files.exists(metadataDir.get)) ``` ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -69,6 +70,51 @@ object KafkaRaftManager { lock } + + /** + * Test if the configured metadata log dir is one of the data log dirs. + */ + def hasDifferentLogDir(config: KafkaConfig): Boolean = { +!config + .logDirs + .map(Paths.get(_).toAbsolutePath) + .contains(Paths.get(config.metadataLogDir).toAbsolutePath) + } + + /** + * Obtain the file lock and delete the metadata log directory completely. + * + * This is only used by ZK brokers that are in pre-migration or hybrid mode of the ZK to KRaft migration. + * The rationale for deleting the metadata log in these cases is that it is safe to do on brokers and it + * it makes recovery from a failed migration much easier. See KAFKA-16463. + * + * @param config The broker config + */ + def maybeDeleteMetadataLogDir(config: KafkaConfig): Unit = { +// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers +if (config.processRoles.nonEmpty) { + throw new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.") +} else if (!config.migrationEnabled) { + throw new RuntimeException("Not deleting metadata log dir since migrations are not enabled.") +} else { + val metadataDir = new File(config.metadataLogDir) + val logDirName = UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION) + val metadataPartitionDir = KafkaRaftManager.createLogDirectory(new File(config.metadataLogDir), logDirName) Review Comment: `new File(config.metadataLogDir)` can be replaced by `metadataDir` ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -480,6 +480,81 @@ class ZkMigrationIntegrationTest { } } + @ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( +new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"), +new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhos
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on PR #15648: URL: https://github.com/apache/kafka/pull/15648#issuecomment-2034709602 I'm going to work on a ducktape test as well. Hopefully I can get that done today -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549818474 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -420,6 +420,12 @@ class KafkaServer( isZkBroker = true, logManager.directoryIdsSet) + // For ZK brokers in migration mode, always delete the metadata partition on startup. + KafkaRaftManager.maybeDeleteMetadataLogDir(config) match { +case Some(err) => logger.error("Could not delete local metadata log dir. This is non-fatal, so continuing with startup.", err) Review Comment: I updated this to let maybeDeleteMetadataLogDir throw and fail startup -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549817561 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -69,6 +69,36 @@ object KafkaRaftManager { lock } + + /** + * Obtain the file lock and delete the metadata log directory completely. + * + * This is only used by ZK brokers that are in pre-migration or hybrid mode of the ZK to KRaft migration. + * The rationale for deleting the metadata log in these cases is that it is safe to do on brokers and it + * it makes recovery from a failed migration much easier. See KAFKA-16463. + * + * @param config The broker config + * @returnAn error wrapped as an Option, if an error occurred. None otherwise + */ + def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = { +// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers +if (config.processRoles.nonEmpty) { + Some(new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")) +} else if (!config.migrationEnabled) { + Some(new RuntimeException("Not deleting metadata log dir since migrations are not enabled.")) +} else { + val metadataDir = new File(config.metadataLogDir) + val deletionLock = KafkaRaftManager.lockDataDir(metadataDir) + try { +Utils.delete(metadataDir) Review Comment: Ok, code has been updated to just delete the `__cluster_metadata-0` directory. I got confused by our naming 😅 metadataLogDir is actually the directory in which the metadata log (which is a directory) exists :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549817561 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -69,6 +69,36 @@ object KafkaRaftManager { lock } + + /** + * Obtain the file lock and delete the metadata log directory completely. + * + * This is only used by ZK brokers that are in pre-migration or hybrid mode of the ZK to KRaft migration. + * The rationale for deleting the metadata log in these cases is that it is safe to do on brokers and it + * it makes recovery from a failed migration much easier. See KAFKA-16463. + * + * @param config The broker config + * @returnAn error wrapped as an Option, if an error occurred. None otherwise + */ + def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = { +// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers +if (config.processRoles.nonEmpty) { + Some(new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")) +} else if (!config.migrationEnabled) { + Some(new RuntimeException("Not deleting metadata log dir since migrations are not enabled.")) +} else { + val metadataDir = new File(config.metadataLogDir) + val deletionLock = KafkaRaftManager.lockDataDir(metadataDir) + try { +Utils.delete(metadataDir) Review Comment: Ok, code has been updated to just delete the `__cluster_metadata-0` directory. I got confused by our naming.. metadataLogDir is actually the directory in which the metadata log (which is a directory) exists 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549813667 ## core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala: ## @@ -389,7 +389,12 @@ class NodeToControllerRequestThread( debug("Controller isn't cached, looking for local metadata changes") controllerInformation.node match { case Some(controllerNode) => - info(s"Recorded new controller, from now on will use node $controllerNode") + val controllerType = if (controllerInformation.isZkController) { +"ZK" + } else { +"KRaft" + } + info(s"Recorded new $controllerType controller, from now on will use node $controllerNode") Review Comment: Unrelated change, but helped when debugging the integration test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549786977 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -420,6 +420,12 @@ class KafkaServer( isZkBroker = true, logManager.directoryIdsSet) + // For ZK brokers in migration mode, always delete the metadata partition on startup. + KafkaRaftManager.maybeDeleteMetadataLogDir(config) match { +case Some(err) => logger.error("Could not delete local metadata log dir. This is non-fatal, so continuing with startup.", err) Review Comment: My rationale here was that the deletion is not strictly required, but rather an optimization for the revert-to-ZK case. I assume RaftManager would also fail if there was some underlying I/O problem, but failing here is probably okay. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
soarez commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1549319336 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -420,6 +420,12 @@ class KafkaServer( isZkBroker = true, logManager.directoryIdsSet) + // For ZK brokers in migration mode, always delete the metadata partition on startup. + KafkaRaftManager.maybeDeleteMetadataLogDir(config) match { +case Some(err) => logger.error("Could not delete local metadata log dir. This is non-fatal, so continuing with startup.", err) Review Comment: Should this really be non-fatal? What's the thinking behind this decision? If there is an IO failure on the metadata log dir the broker should not continue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
cmccabe commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1548528194 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -69,6 +69,36 @@ object KafkaRaftManager { lock } + + /** + * Obtain the file lock and delete the metadata log directory completely. + * + * This is only used by ZK brokers that are in pre-migration or hybrid mode of the ZK to KRaft migration. + * The rationale for deleting the metadata log in these cases is that it is safe to do on brokers and it + * it makes recovery from a failed migration much easier. See KAFKA-16463. + * + * @param config The broker config + * @returnAn error wrapped as an Option, if an error occurred. None otherwise + */ + def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = { +// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers +if (config.processRoles.nonEmpty) { + Some(new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")) +} else if (!config.migrationEnabled) { + Some(new RuntimeException("Not deleting metadata log dir since migrations are not enabled.")) +} else { + val metadataDir = new File(config.metadataLogDir) + val deletionLock = KafkaRaftManager.lockDataDir(metadataDir) + try { +Utils.delete(metadataDir) Review Comment: yes, we need to make sure not to delete the whole metadata directory 😓 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1548499611 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -69,6 +69,36 @@ object KafkaRaftManager { lock } + + /** + * Obtain the file lock and delete the metadata log directory completely. + * + * This is only used by ZK brokers that are in pre-migration or hybrid mode of the ZK to KRaft migration. + * The rationale for deleting the metadata log in these cases is that it is safe to do on brokers and it + * it makes recovery from a failed migration much easier. See KAFKA-16463. + * + * @param config The broker config + * @returnAn error wrapped as an Option, if an error occurred. None otherwise + */ + def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = { +// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers +if (config.processRoles.nonEmpty) { + Some(new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")) +} else if (!config.migrationEnabled) { + Some(new RuntimeException("Not deleting metadata log dir since migrations are not enabled.")) +} else { + val metadataDir = new File(config.metadataLogDir) + val deletionLock = KafkaRaftManager.lockDataDir(metadataDir) + try { +Utils.delete(metadataDir) Review Comment: Thanks @jsancio, I'll fix this and add some additional test cases -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
jsancio commented on code in PR #15648: URL: https://github.com/apache/kafka/pull/15648#discussion_r1548472966 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -69,6 +69,36 @@ object KafkaRaftManager { lock } + + /** + * Obtain the file lock and delete the metadata log directory completely. + * + * This is only used by ZK brokers that are in pre-migration or hybrid mode of the ZK to KRaft migration. + * The rationale for deleting the metadata log in these cases is that it is safe to do on brokers and it + * it makes recovery from a failed migration much easier. See KAFKA-16463. + * + * @param config The broker config + * @returnAn error wrapped as an Option, if an error occurred. None otherwise + */ + def maybeDeleteMetadataLogDir(config: KafkaConfig): Option[Throwable] = { +// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers +if (config.processRoles.nonEmpty) { + Some(new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")) +} else if (!config.migrationEnabled) { + Some(new RuntimeException("Not deleting metadata log dir since migrations are not enabled.")) +} else { + val metadataDir = new File(config.metadataLogDir) + val deletionLock = KafkaRaftManager.lockDataDir(metadataDir) + try { +Utils.delete(metadataDir) Review Comment: This deletes the entire metadata log directory and not the `__cluster_metadata-0` topic partition in the metadata log dir. In some configuration the `metadata.log.dir` equals the `log.dir(s)`. In those configuration this will delete all of the topic partitions in the log directory. If the test pass, this means that we are missing a test that checks this doesn't happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16463 Delete metadata log on ZK broker startup [kafka]
mumrah opened a new pull request, #15648: URL: https://github.com/apache/kafka/pull/15648 This patch changes the behavior of the migrating ZK broker to always delete the local metadata log during startup. This deletion is done immediately before creating the RaftManager which will re-create the log directory and let the broker re-replicate the log from the active controller. The rationale for this change is to make it easier for operators to re-attempt a ZK to KRaft migration after having reverted back to ZK mode. If an operator has reverted back to ZK mode, there will be an invalid metadata log on the disk of each broker. In order to re-attempt the migration in the future, this log needs to be deleted. This can be pretty burdensome to the operator for large clusters, especially since the log deletion must be done while the broker is offline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org