niket-goel commented on a change in pull request #11053:
URL: https://github.com/apache/kafka/pull/11053#discussion_r670853242



##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "4096"
+    METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "4096"
+    METADATA_LOG_SEGMENT_BYTES = "9000000"
+    METADATA_LOG_SEGMENT_MS = "10000"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+                                      
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "test_topic_%d" % i
+            print("Creating topic %s" % topic, flush=True)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: self.file_exists(node, file_path) == False,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path) == True,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def choose_controller_node(self):
+        if self.kafka.remote_controller_quorum:
+            # We are running in remote kraft
+            controller_node = 
random.choice(self.kafka.remote_controller_quorum.nodes)
+        else:
+            # This is colocated kraft
+            # Picking the first node as that is guaranteed to be a controller
+            controller_node = self.kafka.nodes[0]
+
+        self.logger.debug("Picking the controller node %s" % 
self.kafka.who_am_i(controller_node))
+        return controller_node
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists"%file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist"%file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found"%file_path)
+            return True
+
+    def validate_success(self):

Review comment:
       These are not the best checks IMO, I just borrowed them from some other 
test. Please let me know If there are better checks that we could perform.
   

##########
File path: tests/kafkatest/services/kafka/templates/log4j.properties
##########
@@ -111,6 +111,8 @@ log4j.logger.kafka.client.ClientUtils={{ 
log_level|default("DEBUG") }}, kafkaInf
 log4j.logger.kafka.perf={{ log_level|default("DEBUG") }}, kafkaInfoAppender, 
kafkaDebugAppender
 log4j.logger.kafka.perf.ProducerPerformance$ProducerThread={{ 
log_level|default("DEBUG") }}, kafkaInfoAppender, kafkaDebugAppender
 log4j.logger.kafka={{ log_level|default("DEBUG") }}, kafkaInfoAppender, 
kafkaDebugAppender
+log4j.logger.org.apache.kafka.raft={{ log_level|default("DEBUG") }}, 
kafkaInfoAppender, kafkaDebugAppender
+log4j.logger.org.apache.kafka.snapshot={{ log_level|default("DEBUG") }}, 
kafkaInfoAppender, kafkaDebugAppender

Review comment:
       Ack. Left in by mistake.

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "4096"
+    METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "4096"
+    METADATA_LOG_SEGMENT_BYTES = "9000000"
+    METADATA_LOG_SEGMENT_MS = "10000"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+                                      
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "test_topic_%d" % i
+            print("Creating topic %s" % topic, flush=True)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: self.file_exists(node, file_path) == False,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path) == True,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def choose_controller_node(self):
+        if self.kafka.remote_controller_quorum:
+            # We are running in remote kraft
+            controller_node = 
random.choice(self.kafka.remote_controller_quorum.nodes)
+        else:
+            # This is colocated kraft
+            # Picking the first node as that is guaranteed to be a controller
+            controller_node = self.kafka.nodes[0]
+
+        self.logger.debug("Picking the controller node %s" % 
self.kafka.who_am_i(controller_node))
+        return controller_node
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists"%file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist"%file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found"%file_path)
+            return True
+
+    def validate_success(self):
+        cluster_id = self.kafka.cluster_id()
+        assert cluster_id is not None
+        assert len(cluster_id) == 22
+        assert self.kafka.all_nodes_support_topic_ids()
+        assert self.kafka.check_protocol_errors(self)
+
+
+    @cluster(num_nodes=9)
+    @parametrize(metadata_quorum=quorum.remote_kraft)
+    # TODO: enable after concurrent startup has been merged
+    #@matrix(metadata_quorum=quorum.all_kraft)
+    def test_broker(self, metadata_quorum=quorum.colocated_kraft):
+        # Scenario -- stop a broker node and create metadata changes.
+        # Restart the broker and let it catch up
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 1: kill-create_topics-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.stop_node(node)
+        # Now modify the cluster to create more metadata changes
+        self.topics_created += self.create_n_topics(topic_count=10)
+        self.kafka.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 2: kill-clean-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+        self.kafka.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        # and create a topic where the affected broker must be the leader
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 3: kill-clean-start-verify-produce on node 
%s", self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+
+        # Create a topic where the affected broker must be the leader
+        broker_topic = "test_topic_%d" % self.topics_created
+        self.topics_created += 1
+        print("Creating topic %s" % broker_topic, flush=True)
+        topic_cfg = {
+            "topic": broker_topic,
+            "replica-assignment": self.kafka.idx(node),
+            "configs": {"min.insync.replicas": 1}
+        }
+        self.kafka.create_topic(topic_cfg)
+
+        self.kafka.start_node(node)
+
+        # Produce to the newly created topic to ensure broker has caught up
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka,
+                                           broker_topic, 
throughput=self.producer_throughput,
+                                           message_validator=is_int)
+
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                        broker_topic, 
consumer_timeout_ms=30000,
+                                        message_validator=is_int)
+        self.start_producer_and_consumer()
+        self.stop_producer_and_consumer()
+        self.validate()
+
+        self.validate_success()
+
+    @cluster(num_nodes=8)
+    @parametrize(metadata_quorum=quorum.remote_kraft)
+    # TODO: enable after concurrent startup has been merged
+    #@matrix(metadata_quorum=quorum.all_kraft)
+    def test_controller(self, metadata_quorum=quorum.colocated_kraft):
+        # Scenario -- stop a controller node and create metadata changes.
+        # Restart the controller and let it catch up
+        node = self.choose_controller_node()
+        self.logger.debug("Scenario 1: kill-create_topics-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.controller_quorum.stop_node(node)
+        # Now modify the cluster to create more metadata changes
+        self.topics_created += self.create_n_topics(topic_count=10)
+        self.kafka.controller_quorum.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init controller with a clean kafka dir
+        node = self.choose_controller_node()
+        self.logger.debug("Scenario 2: kill-clean-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.controller_quorum.clean_node(node)
+        self.kafka.controller_quorum.start_node(node)
+        self.validate_success()

Review comment:
       Bringing down any one controller, cleaning up it's kafka dir and then 
rebooting is already ensuring that the snapshot is being loaded on recovery. I 
think Jose's comment was on the previous version of the the test, where we were 
not doing a produce/consume check. In this version of the PR we are creating a 
topic and verifying we can produce to it and consume from it. The rationale is 
that to be able to create topic and produce to it, the quorum needs to agree on 
the metadata up until this point.
   
   I take your point that maybe when testing the controller, it would be a 
better check to bring all controllers down, restart them and then do a 
produce/consume check as that would ensure that a quorum of nodes was able to 
load from their snapshots and continue operation. Let me make that change. Will 
update the PR.

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "4096"
+    METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "4096"
+    METADATA_LOG_SEGMENT_BYTES = "9000000"
+    METADATA_LOG_SEGMENT_MS = "10000"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+                                      
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "test_topic_%d" % i
+            print("Creating topic %s" % topic, flush=True)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: self.file_exists(node, file_path) == False,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path) == True,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def choose_controller_node(self):
+        if self.kafka.remote_controller_quorum:
+            # We are running in remote kraft
+            controller_node = 
random.choice(self.kafka.remote_controller_quorum.nodes)
+        else:
+            # This is colocated kraft
+            # Picking the first node as that is guaranteed to be a controller
+            controller_node = self.kafka.nodes[0]
+
+        self.logger.debug("Picking the controller node %s" % 
self.kafka.who_am_i(controller_node))
+        return controller_node
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists"%file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist"%file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found"%file_path)
+            return True
+
+    def validate_success(self):
+        cluster_id = self.kafka.cluster_id()
+        assert cluster_id is not None
+        assert len(cluster_id) == 22
+        assert self.kafka.all_nodes_support_topic_ids()
+        assert self.kafka.check_protocol_errors(self)
+
+
+    @cluster(num_nodes=9)
+    @parametrize(metadata_quorum=quorum.remote_kraft)
+    # TODO: enable after concurrent startup has been merged
+    #@matrix(metadata_quorum=quorum.all_kraft)
+    def test_broker(self, metadata_quorum=quorum.colocated_kraft):
+        # Scenario -- stop a broker node and create metadata changes.
+        # Restart the broker and let it catch up
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 1: kill-create_topics-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.stop_node(node)
+        # Now modify the cluster to create more metadata changes
+        self.topics_created += self.create_n_topics(topic_count=10)
+        self.kafka.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 2: kill-clean-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+        self.kafka.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        # and create a topic where the affected broker must be the leader
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 3: kill-clean-start-verify-produce on node 
%s", self.kafka.who_am_i(node))

Review comment:
       I agree that the third scenario covers the initial cases as well, but 
the reason I left the other ones in was to make debugging failures easier. If a 
subset of the scenarios work, it can give help investigate where the failure 
might be. The scenarios themselves take 10s of seconds to execute. Do you think 
leaving them in is helpful?

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,251 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    TOPIC_NAME_PREFIX = "test_topic_"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, "10000"],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, "2048"],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, "2048"]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            self.controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            self.controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in self.controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, i)
+            self.logger.debug("Creating topic %s" % topic)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: not self.file_exists(node, file_path),
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path),
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists" % file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist" % file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found" % file_path)
+            return True
+
+    def validate_success(self, topic = None):
+        if topic is None:
+            # Create a new topic
+            topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, 
self.topics_created)
+            self.topics_created += self.create_n_topics(topic_count=1)
+
+        # Produce to the newly created topic to ensure broker has caught up
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka,
+                                           topic, 
throughput=self.producer_throughput,
+                                           message_validator=is_int)
+
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                        topic, consumer_timeout_ms=30000,
+                                        message_validator=is_int)
+        self.start_producer_and_consumer()
+        self.stop_producer_and_consumer()
+        self.validate()
+
+    @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_kraft)
+    def test_broker(self, metadata_quorum=quorum.colocated_kraft):
+        """ Test the ability of a broker to consume metadata snapshots
+        and to recover the cluster metadata state using them
+
+        The test ensures that that there is atleast one snapshot created on
+        the controller quorum during the setup phase and that at least the 
first
+        log segment in the metadata log has been marked for deletion, thereby 
ensuring
+        that any observer of the log needs to always load a snapshot to catch
+        up to the current metadata state.
+
+        Each scenario is a progression over the previous one.
+        The scenarios build on top of each other by:
+        * Loading a snapshot
+        * Loading and snapshot and some delta records
+        * Loading a snapshot and delta and ensuring that the most recent 
metadata state
+          has been caught up.
+
+        Even though a subsequent scenario covers the previous one, they are all
+        left in the test to make debugging a failure of the test easier
+        e.g. if the first scenario passes and the second fails, it hints 
towards
+        a problem with the application of delta records while catching up

Review comment:
       Same as below
   

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,224 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+import time
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "2048"
+    METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "2048"
+    METADATA_LOG_SEGMENT_BYTES = "9000000"
+    METADATA_LOG_SEGMENT_MS = "10000"
+    TOPIC_NAME_PREFIX = "test_topic_"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+                                      
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            self.controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            self.controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in self.controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, i)
+            self.logger.debug("Creating topic %s" % topic)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: not self.file_exists(node, file_path),
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path),
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists" % file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist" % file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found" % file_path)
+            return True
+
+    def validate_success(self, topic = None):
+        if topic is None:
+            # Create a new topic
+            topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, 
self.topics_created)
+            self.topics_created += self.create_n_topics(topic_count=1)
+
+        # Produce to the newly created topic to ensure broker has caught up
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka,
+                                           topic, 
throughput=self.producer_throughput,
+                                           message_validator=is_int)
+
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                        topic, consumer_timeout_ms=30000,
+                                        message_validator=is_int)
+        self.start_producer_and_consumer()
+        self.stop_producer_and_consumer()
+        self.validate()
+
+    @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_kraft)
+    def test_broker(self, metadata_quorum=quorum.colocated_kraft):
+        # The following set of scenarios are meant to test the ability
+        # of a broker to consume metadata snapshots and to recover using them
+        #
+        # During setup phase, the test ensures that there is atleast
+        # one snapshot created on the controllers and that at least the first
+        # log segment has been truncated (requiring an observer to always load 
+        # a snapshot to catch up to the metadata state)
+        #
+        # Each scenario is a progression over the previous one.
+        # Redundant scenarios are left in to make debugging a failure of the 
test easier
+
+        # Scenario -- Re-init broker after cleaning up all persistent state

Review comment:
       Synced offline. Breaking away into separate test functions would greatly 
increase the test runtime. Siding towards keeping them in one function. The 
reasoning for multiple scenarios is now added to the test description

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "4096"
+    METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "4096"
+    METADATA_LOG_SEGMENT_BYTES = "9000000"
+    METADATA_LOG_SEGMENT_MS = "10000"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+                                      
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "test_topic_%d" % i
+            print("Creating topic %s" % topic, flush=True)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: self.file_exists(node, file_path) == False,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path) == True,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def choose_controller_node(self):
+        if self.kafka.remote_controller_quorum:
+            # We are running in remote kraft
+            controller_node = 
random.choice(self.kafka.remote_controller_quorum.nodes)
+        else:
+            # This is colocated kraft
+            # Picking the first node as that is guaranteed to be a controller
+            controller_node = self.kafka.nodes[0]
+
+        self.logger.debug("Picking the controller node %s" % 
self.kafka.who_am_i(controller_node))
+        return controller_node
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists"%file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist"%file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found"%file_path)
+            return True
+
+    def validate_success(self):
+        cluster_id = self.kafka.cluster_id()
+        assert cluster_id is not None
+        assert len(cluster_id) == 22
+        assert self.kafka.all_nodes_support_topic_ids()
+        assert self.kafka.check_protocol_errors(self)

Review comment:
       Same comment as the one on the controller success validation. This is 
not a good verification method at the moment, I just borrowed it from another 
test. For us I think the fact that the nodes are up and we can produce/consume 
should be a good check. I will make a change to have that as the verification.

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,224 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+import time
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "2048"

Review comment:
       Fair point. Will remove these definitions.

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "4096"
+    METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "4096"
+    METADATA_LOG_SEGMENT_BYTES = "9000000"
+    METADATA_LOG_SEGMENT_MS = "10000"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+                                      
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]

Review comment:
       We could do that. It would just be a bigger change to get expose 
controller nodes correctly through state transitions/modifications within the 
service (I think). I would maybe not do it in this PR?

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "4096"
+    METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "4096"
+    METADATA_LOG_SEGMENT_BYTES = "9000000"
+    METADATA_LOG_SEGMENT_MS = "10000"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+                                      
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "test_topic_%d" % i
+            print("Creating topic %s" % topic, flush=True)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: self.file_exists(node, file_path) == False,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path) == True,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def choose_controller_node(self):
+        if self.kafka.remote_controller_quorum:
+            # We are running in remote kraft
+            controller_node = 
random.choice(self.kafka.remote_controller_quorum.nodes)
+        else:
+            # This is colocated kraft
+            # Picking the first node as that is guaranteed to be a controller
+            controller_node = self.kafka.nodes[0]
+
+        self.logger.debug("Picking the controller node %s" % 
self.kafka.who_am_i(controller_node))
+        return controller_node
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists"%file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist"%file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found"%file_path)
+            return True
+
+    def validate_success(self):
+        cluster_id = self.kafka.cluster_id()
+        assert cluster_id is not None
+        assert len(cluster_id) == 22
+        assert self.kafka.all_nodes_support_topic_ids()
+        assert self.kafka.check_protocol_errors(self)
+
+
+    @cluster(num_nodes=9)
+    @parametrize(metadata_quorum=quorum.remote_kraft)
+    # TODO: enable after concurrent startup has been merged
+    #@matrix(metadata_quorum=quorum.all_kraft)
+    def test_broker(self, metadata_quorum=quorum.colocated_kraft):
+        # Scenario -- stop a broker node and create metadata changes.
+        # Restart the broker and let it catch up
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 1: kill-create_topics-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.stop_node(node)
+        # Now modify the cluster to create more metadata changes
+        self.topics_created += self.create_n_topics(topic_count=10)
+        self.kafka.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 2: kill-clean-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+        self.kafka.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        # and create a topic where the affected broker must be the leader
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 3: kill-clean-start-verify-produce on node 
%s", self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+
+        # Create a topic where the affected broker must be the leader
+        broker_topic = "test_topic_%d" % self.topics_created
+        self.topics_created += 1
+        print("Creating topic %s" % broker_topic, flush=True)
+        topic_cfg = {
+            "topic": broker_topic,
+            "replica-assignment": self.kafka.idx(node),
+            "configs": {"min.insync.replicas": 1}
+        }
+        self.kafka.create_topic(topic_cfg)
+
+        self.kafka.start_node(node)
+
+        # Produce to the newly created topic to ensure broker has caught up
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka,
+                                           broker_topic, 
throughput=self.producer_throughput,
+                                           message_validator=is_int)
+
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                        broker_topic, 
consumer_timeout_ms=30000,
+                                        message_validator=is_int)
+        self.start_producer_and_consumer()
+        self.stop_producer_and_consumer()
+        self.validate()
+
+        self.validate_success()
+
+    @cluster(num_nodes=8)
+    @parametrize(metadata_quorum=quorum.remote_kraft)
+    # TODO: enable after concurrent startup has been merged
+    #@matrix(metadata_quorum=quorum.all_kraft)
+    def test_controller(self, metadata_quorum=quorum.colocated_kraft):
+        # Scenario -- stop a controller node and create metadata changes.
+        # Restart the controller and let it catch up
+        node = self.choose_controller_node()
+        self.logger.debug("Scenario 1: kill-create_topics-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.controller_quorum.stop_node(node)
+        # Now modify the cluster to create more metadata changes
+        self.topics_created += self.create_n_topics(topic_count=10)
+        self.kafka.controller_quorum.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init controller with a clean kafka dir
+        node = self.choose_controller_node()
+        self.logger.debug("Scenario 2: kill-clean-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.controller_quorum.clean_node(node)
+        self.kafka.controller_quorum.start_node(node)
+        self.validate_success()

Review comment:
       Updated to bounce all controllers in the latest revision.

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "4096"
+    METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "4096"
+    METADATA_LOG_SEGMENT_BYTES = "9000000"
+    METADATA_LOG_SEGMENT_MS = "10000"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+                                      
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "test_topic_%d" % i
+            print("Creating topic %s" % topic, flush=True)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: self.file_exists(node, file_path) == False,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path) == True,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def choose_controller_node(self):
+        if self.kafka.remote_controller_quorum:
+            # We are running in remote kraft
+            controller_node = 
random.choice(self.kafka.remote_controller_quorum.nodes)
+        else:
+            # This is colocated kraft
+            # Picking the first node as that is guaranteed to be a controller
+            controller_node = self.kafka.nodes[0]
+
+        self.logger.debug("Picking the controller node %s" % 
self.kafka.who_am_i(controller_node))
+        return controller_node
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists"%file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist"%file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found"%file_path)
+            return True
+
+    def validate_success(self):
+        cluster_id = self.kafka.cluster_id()
+        assert cluster_id is not None
+        assert len(cluster_id) == 22
+        assert self.kafka.all_nodes_support_topic_ids()
+        assert self.kafka.check_protocol_errors(self)
+
+
+    @cluster(num_nodes=9)
+    @parametrize(metadata_quorum=quorum.remote_kraft)
+    # TODO: enable after concurrent startup has been merged

Review comment:
       I wanted to discuss this with you. Let me ping offline about this. 
Basically we are not testing colocated craft at this point.

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "4096"
+    METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "4096"
+    METADATA_LOG_SEGMENT_BYTES = "9000000"
+    METADATA_LOG_SEGMENT_MS = "10000"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+                                      
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "test_topic_%d" % i
+            print("Creating topic %s" % topic, flush=True)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: self.file_exists(node, file_path) == False,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path) == True,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def choose_controller_node(self):
+        if self.kafka.remote_controller_quorum:
+            # We are running in remote kraft
+            controller_node = 
random.choice(self.kafka.remote_controller_quorum.nodes)
+        else:
+            # This is colocated kraft
+            # Picking the first node as that is guaranteed to be a controller
+            controller_node = self.kafka.nodes[0]

Review comment:
       Yes.. We can make it a random selection as well. The reason I left it 
this way was for simplicity (colocated mode has both controller and non 
controller nodes in the same list)

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "4096"
+    METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "4096"
+    METADATA_LOG_SEGMENT_BYTES = "9000000"
+    METADATA_LOG_SEGMENT_MS = "10000"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+                                      
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "test_topic_%d" % i
+            print("Creating topic %s" % topic, flush=True)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: self.file_exists(node, file_path) == False,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path) == True,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def choose_controller_node(self):
+        if self.kafka.remote_controller_quorum:
+            # We are running in remote kraft
+            controller_node = 
random.choice(self.kafka.remote_controller_quorum.nodes)
+        else:
+            # This is colocated kraft
+            # Picking the first node as that is guaranteed to be a controller
+            controller_node = self.kafka.nodes[0]
+
+        self.logger.debug("Picking the controller node %s" % 
self.kafka.who_am_i(controller_node))
+        return controller_node
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists"%file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist"%file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found"%file_path)
+            return True
+
+    def validate_success(self):
+        cluster_id = self.kafka.cluster_id()
+        assert cluster_id is not None
+        assert len(cluster_id) == 22
+        assert self.kafka.all_nodes_support_topic_ids()
+        assert self.kafka.check_protocol_errors(self)
+
+
+    @cluster(num_nodes=9)
+    @parametrize(metadata_quorum=quorum.remote_kraft)
+    # TODO: enable after concurrent startup has been merged
+    #@matrix(metadata_quorum=quorum.all_kraft)
+    def test_broker(self, metadata_quorum=quorum.colocated_kraft):
+        # Scenario -- stop a broker node and create metadata changes.
+        # Restart the broker and let it catch up
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 1: kill-create_topics-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.stop_node(node)
+        # Now modify the cluster to create more metadata changes
+        self.topics_created += self.create_n_topics(topic_count=10)
+        self.kafka.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 2: kill-clean-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+        self.kafka.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        # and create a topic where the affected broker must be the leader
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 3: kill-clean-start-verify-produce on node 
%s", self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+
+        # Create a topic where the affected broker must be the leader
+        broker_topic = "test_topic_%d" % self.topics_created
+        self.topics_created += 1
+        print("Creating topic %s" % broker_topic, flush=True)
+        topic_cfg = {
+            "topic": broker_topic,
+            "replica-assignment": self.kafka.idx(node),
+            "configs": {"min.insync.replicas": 1}
+        }
+        self.kafka.create_topic(topic_cfg)
+
+        self.kafka.start_node(node)
+
+        # Produce to the newly created topic to ensure broker has caught up
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka,
+                                           broker_topic, 
throughput=self.producer_throughput,
+                                           message_validator=is_int)
+
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                        broker_topic, 
consumer_timeout_ms=30000,
+                                        message_validator=is_int)
+        self.start_producer_and_consumer()
+        self.stop_producer_and_consumer()
+        self.validate()
+
+        self.validate_success()
+
+    @cluster(num_nodes=8)
+    @parametrize(metadata_quorum=quorum.remote_kraft)
+    # TODO: enable after concurrent startup has been merged
+    #@matrix(metadata_quorum=quorum.all_kraft)
+    def test_controller(self, metadata_quorum=quorum.colocated_kraft):
+        # Scenario -- stop a controller node and create metadata changes.
+        # Restart the controller and let it catch up
+        node = self.choose_controller_node()
+        self.logger.debug("Scenario 1: kill-create_topics-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.controller_quorum.stop_node(node)
+        # Now modify the cluster to create more metadata changes
+        self.topics_created += self.create_n_topics(topic_count=10)
+        self.kafka.controller_quorum.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init controller with a clean kafka dir
+        node = self.choose_controller_node()
+        self.logger.debug("Scenario 2: kill-clean-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.controller_quorum.clean_node(node)
+        self.kafka.controller_quorum.start_node(node)
+        self.validate_success()

Review comment:
       I agree that this is not a good check. I was trying to look for 
verification that other tests do, but it seems that maybe topic creation and 
production/consumption is the best verification. I will do that here as well.

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "4096"
+    METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "4096"
+    METADATA_LOG_SEGMENT_BYTES = "9000000"
+    METADATA_LOG_SEGMENT_MS = "10000"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+                                      
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "test_topic_%d" % i
+            print("Creating topic %s" % topic, flush=True)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: self.file_exists(node, file_path) == False,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path) == True,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def choose_controller_node(self):
+        if self.kafka.remote_controller_quorum:
+            # We are running in remote kraft
+            controller_node = 
random.choice(self.kafka.remote_controller_quorum.nodes)
+        else:
+            # This is colocated kraft
+            # Picking the first node as that is guaranteed to be a controller
+            controller_node = self.kafka.nodes[0]
+
+        self.logger.debug("Picking the controller node %s" % 
self.kafka.who_am_i(controller_node))
+        return controller_node
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists"%file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist"%file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found"%file_path)
+            return True
+
+    def validate_success(self):
+        cluster_id = self.kafka.cluster_id()
+        assert cluster_id is not None
+        assert len(cluster_id) == 22
+        assert self.kafka.all_nodes_support_topic_ids()
+        assert self.kafka.check_protocol_errors(self)
+
+
+    @cluster(num_nodes=9)
+    @parametrize(metadata_quorum=quorum.remote_kraft)
+    # TODO: enable after concurrent startup has been merged
+    #@matrix(metadata_quorum=quorum.all_kraft)
+    def test_broker(self, metadata_quorum=quorum.colocated_kraft):
+        # Scenario -- stop a broker node and create metadata changes.
+        # Restart the broker and let it catch up
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 1: kill-create_topics-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.stop_node(node)
+        # Now modify the cluster to create more metadata changes
+        self.topics_created += self.create_n_topics(topic_count=10)
+        self.kafka.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 2: kill-clean-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+        self.kafka.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        # and create a topic where the affected broker must be the leader
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 3: kill-clean-start-verify-produce on node 
%s", self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+
+        # Create a topic where the affected broker must be the leader
+        broker_topic = "test_topic_%d" % self.topics_created
+        self.topics_created += 1
+        print("Creating topic %s" % broker_topic, flush=True)
+        topic_cfg = {
+            "topic": broker_topic,
+            "replica-assignment": self.kafka.idx(node),
+            "configs": {"min.insync.replicas": 1}
+        }
+        self.kafka.create_topic(topic_cfg)
+
+        self.kafka.start_node(node)
+
+        # Produce to the newly created topic to ensure broker has caught up
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka,
+                                           broker_topic, 
throughput=self.producer_throughput,
+                                           message_validator=is_int)
+
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                        broker_topic, 
consumer_timeout_ms=30000,
+                                        message_validator=is_int)
+        self.start_producer_and_consumer()
+        self.stop_producer_and_consumer()
+        self.validate()
+
+        self.validate_success()
+
+    @cluster(num_nodes=8)
+    @parametrize(metadata_quorum=quorum.remote_kraft)
+    # TODO: enable after concurrent startup has been merged
+    #@matrix(metadata_quorum=quorum.all_kraft)
+    def test_controller(self, metadata_quorum=quorum.colocated_kraft):
+        # Scenario -- stop a controller node and create metadata changes.
+        # Restart the controller and let it catch up
+        node = self.choose_controller_node()
+        self.logger.debug("Scenario 1: kill-create_topics-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.controller_quorum.stop_node(node)
+        # Now modify the cluster to create more metadata changes
+        self.topics_created += self.create_n_topics(topic_count=10)
+        self.kafka.controller_quorum.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init controller with a clean kafka dir
+        node = self.choose_controller_node()
+        self.logger.debug("Scenario 2: kill-clean-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.controller_quorum.clean_node(node)
+        self.kafka.controller_quorum.start_node(node)
+        self.validate_success()

Review comment:
       Agree with you @jsancio . The latest commit for the PR should already be 
doing this.

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,227 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "4096"
+    METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "4096"
+    METADATA_LOG_SEGMENT_BYTES = "9000000"
+    METADATA_LOG_SEGMENT_MS = "10000"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+                                      
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "test_topic_%d" % i
+            print("Creating topic %s" % topic, flush=True)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: self.file_exists(node, file_path) == False,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path) == True,
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def choose_controller_node(self):
+        if self.kafka.remote_controller_quorum:
+            # We are running in remote kraft
+            controller_node = 
random.choice(self.kafka.remote_controller_quorum.nodes)
+        else:
+            # This is colocated kraft
+            # Picking the first node as that is guaranteed to be a controller
+            controller_node = self.kafka.nodes[0]
+
+        self.logger.debug("Picking the controller node %s" % 
self.kafka.who_am_i(controller_node))
+        return controller_node
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists"%file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist"%file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found"%file_path)
+            return True
+
+    def validate_success(self):
+        cluster_id = self.kafka.cluster_id()
+        assert cluster_id is not None
+        assert len(cluster_id) == 22
+        assert self.kafka.all_nodes_support_topic_ids()
+        assert self.kafka.check_protocol_errors(self)
+
+
+    @cluster(num_nodes=9)
+    @parametrize(metadata_quorum=quorum.remote_kraft)
+    # TODO: enable after concurrent startup has been merged
+    #@matrix(metadata_quorum=quorum.all_kraft)
+    def test_broker(self, metadata_quorum=quorum.colocated_kraft):
+        # Scenario -- stop a broker node and create metadata changes.
+        # Restart the broker and let it catch up
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 1: kill-create_topics-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.stop_node(node)
+        # Now modify the cluster to create more metadata changes
+        self.topics_created += self.create_n_topics(topic_count=10)
+        self.kafka.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 2: kill-clean-start on node %s", 
self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+        self.kafka.start_node(node)
+        self.validate_success()
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        # and create a topic where the affected broker must be the leader
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario 3: kill-clean-start-verify-produce on node 
%s", self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+
+        # Create a topic where the affected broker must be the leader
+        broker_topic = "test_topic_%d" % self.topics_created
+        self.topics_created += 1
+        print("Creating topic %s" % broker_topic, flush=True)

Review comment:
       So the reason I used this is because other tests force the output of 
topic create to stdout (for Jenkins I think).I can move this to the logger 
instead.

##########
File path: tests/kafkatest/services/kafka/config.py
##########
@@ -24,6 +24,11 @@ class KafkaConfig(dict):
     DEFAULTS = {
         config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536,
         config_property.LOG_DIRS: 
"/mnt/kafka/kafka-data-logs-1,/mnt/kafka/kafka-data-logs-2",
+        config_property.METADATA_LOG_DIR: "/mnt/kafka/kafka-metadata-logs",
+        config_property.METADATA_LOG_SEGMENT_BYTES: 8388608,
+        config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS: 100,

Review comment:
       The idea was to initially stress test it. Then we ended up choosing a 
higher value for this config which ends up overriding this value. Will fix the 
default setting here to be higher.

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,224 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+import time
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    METADATA_LOG_RETENTION_BYTES = "2048"
+    METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS = "2048"
+    METADATA_LOG_SEGMENT_BYTES = "9000000"
+    METADATA_LOG_SEGMENT_MS = "10000"
+    TOPIC_NAME_PREFIX = "test_topic_"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, 
TestSnapshots.METADATA_LOG_SEGMENT_MS],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, 
TestSnapshots.METADATA_LOG_RETENTION_BYTES],
+                                      
[config_property.METADATA_LOG_SEGMENT_BYTES, 
TestSnapshots.METADATA_LOG_SEGMENT_BYTES],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, 
TestSnapshots.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            self.controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            self.controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in self.controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, i)
+            self.logger.debug("Creating topic %s" % topic)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: not self.file_exists(node, file_path),
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path),
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists" % file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist" % file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found" % file_path)
+            return True
+
+    def validate_success(self, topic = None):
+        if topic is None:
+            # Create a new topic
+            topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, 
self.topics_created)
+            self.topics_created += self.create_n_topics(topic_count=1)
+
+        # Produce to the newly created topic to ensure broker has caught up
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka,
+                                           topic, 
throughput=self.producer_throughput,
+                                           message_validator=is_int)
+
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                        topic, consumer_timeout_ms=30000,
+                                        message_validator=is_int)
+        self.start_producer_and_consumer()
+        self.stop_producer_and_consumer()
+        self.validate()
+
+    @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_kraft)
+    def test_broker(self, metadata_quorum=quorum.colocated_kraft):
+        # The following set of scenarios are meant to test the ability
+        # of a broker to consume metadata snapshots and to recover using them
+        #
+        # During setup phase, the test ensures that there is atleast
+        # one snapshot created on the controllers and that at least the first
+        # log segment has been truncated (requiring an observer to always load 
+        # a snapshot to catch up to the metadata state)
+        #
+        # Each scenario is a progression over the previous one.
+        # Redundant scenarios are left in to make debugging a failure of the 
test easier
+
+        # Scenario -- Re-init broker after cleaning up all persistent state

Review comment:
       I am not sure I got your point. Will sync offline about this.

##########
File path: tests/kafkatest/tests/core/snapshot_test.py
##########
@@ -0,0 +1,251 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import parametrize, matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.kafka import quorum
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import config_property
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
+from kafkatest.utils import is_int
+import random
+
+class TestSnapshots(ProduceConsumeValidateTest):
+
+    TOPIC_NAME_PREFIX = "test_topic_"
+
+    def __init__(self, test_context):
+        super(TestSnapshots, self).__init__(test_context=test_context)
+        self.topics_created = 0
+        self.topic = "test_topic"
+        self.partitions = 3
+        self.replication_factor = 3
+        self.num_nodes = 3
+
+        # Producer and consumer
+        self.producer_throughput = 1000
+        self.num_producers = 1
+        self.num_consumers = 1
+
+        security_protocol = 'PLAINTEXT'
+        # Setup Custom Config to ensure snapshot will be generated 
deterministically
+        self.kafka = KafkaService(self.test_context, self.num_nodes, zk=None,
+                                  topics={self.topic: {"partitions": 
self.partitions,
+                                                       "replication-factor": 
self.replication_factor,
+                                                       'configs': 
{"min.insync.replicas": 2}}},
+                                  server_prop_overrides=[
+                                      [config_property.METADATA_LOG_DIR, 
KafkaService.METADATA_LOG_DIR],
+                                      
[config_property.METADATA_LOG_SEGMENT_MS, "10000"],
+                                      
[config_property.METADATA_LOG_RETENTION_BYTES, "2048"],
+                                      
[config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS, "2048"]
+                                  ])
+
+        self.kafka.interbroker_security_protocol = security_protocol
+        self.kafka.security_protocol = security_protocol
+
+    def setUp(self):
+        # Start the cluster and ensure that a snapshot is generated
+        self.logger.info("Starting the cluster and running until snapshot 
creation")
+
+        assert quorum.for_test(self.test_context) in quorum.all_kraft, \
+                "Snapshot test should be run Kraft Modes only"
+
+        self.kafka.start()
+
+        topic_count = 10
+        self.topics_created += self.create_n_topics(topic_count)
+
+        if self.kafka.remote_controller_quorum:
+            self.controller_nodes = self.kafka.remote_controller_quorum.nodes
+        else:
+            self.controller_nodes = 
self.kafka.nodes[:self.kafka.num_nodes_controller_role]
+
+        # Waiting for snapshot creation and first log segment
+        # cleanup on all controller nodes
+        for node in self.controller_nodes:
+            self.logger.debug("Waiting for snapshot on: %s" % 
self.kafka.who_am_i(node))
+            self.wait_for_log_segment_delete(node)
+            self.wait_for_snapshot(node)
+        self.logger.debug("Verified Snapshots exist on controller nodes")
+
+    def create_n_topics(self, topic_count):
+        for i in range(self.topics_created, topic_count):
+            topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, i)
+            self.logger.debug("Creating topic %s" % topic)
+            topic_cfg = {
+                "topic": topic,
+                "partitions": self.partitions,
+                "replication-factor": self.replication_factor,
+                "configs": {"min.insync.replicas": 2}
+            }
+            self.kafka.create_topic(topic_cfg)
+        self.logger.debug("Created %d more topics" % topic_count)
+        return topic_count
+
+    def wait_for_log_segment_delete(self, node):
+        file_path = self.kafka.METADATA_FIRST_LOG
+        # Wait until the first log segment in metadata log is marked for 
deletion
+        wait_until(lambda: not self.file_exists(node, file_path),
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify cleanup of log 
file %s in a reasonable amount of time" % file_path)
+
+    def wait_for_snapshot(self, node):
+        # Wait for a snapshot file to show up
+        file_path = self.kafka.METADATA_SNAPSHOT_SEARCH_STR
+        wait_until(lambda: self.file_exists(node, file_path),
+                   timeout_sec=100,
+                   backoff_sec=1, err_msg="Not able to verify snapshot 
existence in a reasonable amount of time")
+
+    def file_exists(self, node, file_path):
+        # Check if the first log segment is cleaned up
+        self.logger.debug("Checking if file %s exists" % file_path)
+        cmd = "ls %s" % file_path
+        files = node.account.ssh_output(cmd, allow_fail=True, 
combine_stderr=False)
+
+        if len(files) is 0:
+            self.logger.debug("File %s does not exist" % file_path)
+            return False
+        else:
+            self.logger.debug("File %s was found" % file_path)
+            return True
+
+    def validate_success(self, topic = None):
+        if topic is None:
+            # Create a new topic
+            topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, 
self.topics_created)
+            self.topics_created += self.create_n_topics(topic_count=1)
+
+        # Produce to the newly created topic to ensure broker has caught up
+        self.producer = VerifiableProducer(self.test_context, 
self.num_producers, self.kafka,
+                                           topic, 
throughput=self.producer_throughput,
+                                           message_validator=is_int)
+
+        self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                        topic, consumer_timeout_ms=30000,
+                                        message_validator=is_int)
+        self.start_producer_and_consumer()
+        self.stop_producer_and_consumer()
+        self.validate()
+
+    @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_kraft)
+    def test_broker(self, metadata_quorum=quorum.colocated_kraft):
+        """ Test the ability of a broker to consume metadata snapshots
+        and to recover the cluster metadata state using them
+
+        The test ensures that that there is atleast one snapshot created on
+        the controller quorum during the setup phase and that at least the 
first
+        log segment in the metadata log has been marked for deletion, thereby 
ensuring
+        that any observer of the log needs to always load a snapshot to catch
+        up to the current metadata state.
+
+        Each scenario is a progression over the previous one.
+        The scenarios build on top of each other by:
+        * Loading a snapshot
+        * Loading and snapshot and some delta records
+        * Loading a snapshot and delta and ensuring that the most recent 
metadata state
+          has been caught up.
+
+        Even though a subsequent scenario covers the previous one, they are all
+        left in the test to make debugging a failure of the test easier
+        e.g. if the first scenario passes and the second fails, it hints 
towards
+        a problem with the application of delta records while catching up
+        """
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario: kill-clean-start on broker node %s", 
self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+        self.kafka.start_node(node)
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        # Create some metadata changes for the broker to consume as well.
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario: kill-clean-create_topics-start on broker 
node %s", self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+        # Now modify the cluster to create more metadata changes
+        self.topics_created += self.create_n_topics(topic_count=10)
+        self.kafka.start_node(node)
+
+        # Scenario -- Re-init broker after cleaning up all persistent state
+        # And ensure that the broker has replicated the metadata log
+        node = random.choice(self.kafka.nodes)
+        self.logger.debug("Scenario: kill-clean-start-verify-produce on broker 
node %s", self.kafka.who_am_i(node))
+        self.kafka.clean_node(node)
+        self.kafka.start_node(node)
+        # Create a topic where the affected broker must be the leader
+        broker_topic = "%s%d" % (TestSnapshots.TOPIC_NAME_PREFIX, 
self.topics_created)
+        self.topics_created += 1
+        self.logger.debug("Creating topic %s" % broker_topic)
+        topic_cfg = {
+            "topic": broker_topic,
+            "replica-assignment": self.kafka.idx(node),
+            "configs": {"min.insync.replicas": 1}
+        }
+        self.kafka.create_topic(topic_cfg)
+
+        # Produce to the newly created topic and make sure it works.
+        self.validate_success(broker_topic)
+
+    @cluster(num_nodes=9)
+    @matrix(metadata_quorum=quorum.all_kraft)
+    def test_controller(self, metadata_quorum=quorum.colocated_kraft):
+        """ Test the ability of controllers to consume metadata snapshots
+        and to recover the cluster metadata state using them
+
+        The test ensures that that there is atleast one snapshot created on
+        the controller quorum during the setup phase and that at least the 
first
+        log segment in the metadata log has been marked for deletion, thereby 
ensuring
+        that any observer of the log needs to always load a snapshot to catch
+        up to the current metadata state.
+
+        Each scenario is a progression over the previous one.
+        The scenarios build on top of each other by:
+        * Loading a snapshot
+        * Loading and snapshot and some delta records
+        * Loading a snapshot and delta and ensuring that the most recent 
metadata state
+          has been caught up.
+
+        Even though a subsequent scenario covers the previous one, they are all
+        left in the test to make debugging a failure of the test easier
+        e.g. if the first scenario passes and the second fails, it hints 
towards
+        a problem with the application of delta records while catching up

Review comment:
       What you are suggesting is definitely a stronger check. The reason I did 
not do that was :
     1. The way the code is laid out, doing a produce consume cycle needs two 
additional nodes each time. Doing it multiple times would have needed 
(num_scenarios*2) extra nodes to run the test.
     2. Being able to start up the controller/broker without error is a check 
by itself (we caught bugs just while starting up/creating topics), and for 
completeness the final validate_success validates all prior actions.
   
   I can address this concern the following ways:
     1. I could modify code to have the same Producer/Consumer node do 
validation for multiple topics, but that might take time.
     2. I could add more nodes to the test to account for the extra 
producers/consumers.
     3. Check in the code as is for now and improve it (alongside other test 
cases in another iteration)
   
   What would you prefer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to