jsancio commented on code in PR #12084:
URL: https://github.com/apache/kafka/pull/12084#discussion_r882207427


##########
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##########
@@ -288,6 +290,12 @@ object DumpLogSegments {
                   case ControlRecordType.ABORT | ControlRecordType.COMMIT =>
                     val endTxnMarker = EndTransactionMarker.deserialize(record)
                     print(s" endTxnMarker: ${endTxnMarker.controlType} 
coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
+                  case ControlRecordType.SNAPSHOT_HEADER =>
+                    val header = 
ControlRecordUtils.deserializedSnapshotHeaderRecord(record)
+                    print(s" SnapshotHeader version: ${header.version()} 
lastContainedLogTimestamp: ${header.lastContainedLogTimestamp()}")
+                  case ControlRecordType.SNAPSHOT_FOOTER =>
+                    val footer = 
ControlRecordUtils.deserializedSnapshotFooterRecord(record)
+                    print(s" SnapshotFooter version: ${footer.version()}")

Review Comment:
   How about using `Snapshot{Footer,Header}RecordJsonConverter` to print this 
in a human-readable format? The `write` method in those classes returns a 
`JsonNode`. `JsonNode`'s `toString` may be want we want.



##########
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##########
@@ -273,9 +277,73 @@ class DumpLogSegmentsTest {
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 
records:_*), leaderEpoch = 2)
 
     output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"--skip-record-metadata", "false", "--files", logFilePath))
-    assert(output.contains("TOPIC_RECORD"))
-    assert(output.contains("BROKER_RECORD"))
-    assert(output.contains("skipping"))
+    assertTrue(output.contains("TOPIC_RECORD"))
+    assertTrue(output.contains("BROKER_RECORD"))
+    assertTrue(output.contains("skipping"))
+  }
+
+  @Test
+  def testDumpMetadataSnapshot(): Unit = {
+    val metadataRecords = Seq(
+      new ApiMessageAndVersion(
+        new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(10), 
0.toShort),
+      new ApiMessageAndVersion(
+        new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(20), 
0.toShort),
+      new ApiMessageAndVersion(
+        new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()), 
0.toShort),
+      new ApiMessageAndVersion(
+        new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
+          setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+    )
+
+    val metadataLog = KafkaMetadataLog(
+      KafkaRaftServer.MetadataPartition,
+      KafkaRaftServer.MetadataTopicId,
+      logDir,
+      time,
+      time.scheduler,
+      MetadataLogConfig(
+        logSegmentBytes = 100 * 1024,
+        logSegmentMinBytes = 100 * 1024,
+        logSegmentMillis = 10 * 1000,
+        retentionMaxBytes = 100 * 1024,
+        retentionMillis = 60 * 1000,
+        maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+        maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+        fileDeleteDelayMs = Defaults.FileDeleteDelayMs,
+        nodeId = 1
+      )
+    )
+
+    val lastContainedLogTimestamp = 10000
+
+    val snapshotWriter = RecordsSnapshotWriter.createWithHeader(
+      () => metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)),
+      1024,
+      MemoryPool.NONE,
+      new MockTime,
+      lastContainedLogTimestamp,
+      CompressionType.NONE,
+      new MetadataRecordSerde
+    ).get()
+
+    snapshotWriter.append(metadataRecords.asJava)
+    snapshotWriter.freeze()
+    snapshotWriter.close()

Review Comment:
   `TestUtils` has a `def resource` that will close the `snapshotWriter`. Maybe 
you will find this useful.



##########
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##########
@@ -248,8 +248,10 @@ object DumpLogSegments {
                       parser: MessageParser[_, _],
                       skipRecordMetadata: Boolean,
                       maxBytes: Int): Unit = {
-    val startOffset = file.getName.split("\\.")(0).toLong
-    println("Starting offset: " + startOffset)
+    if (file.getName.endsWith(UnifiedLog.LogFileSuffix)) {
+      val startOffset = file.getName.split("\\.")(0).toLong
+      println("Starting offset: " + startOffset)
+    }

Review Comment:
   Should we add an `else if` for snapshots? Something like `s"Snapshot epoch 
($epoch) and end offset ($endOffset)"`. The parsing function `Snapshots.parse` 
should return this information.



##########
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##########
@@ -273,9 +277,73 @@ class DumpLogSegmentsTest {
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 
records:_*), leaderEpoch = 2)
 
     output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"--skip-record-metadata", "false", "--files", logFilePath))
-    assert(output.contains("TOPIC_RECORD"))
-    assert(output.contains("BROKER_RECORD"))
-    assert(output.contains("skipping"))
+    assertTrue(output.contains("TOPIC_RECORD"))
+    assertTrue(output.contains("BROKER_RECORD"))
+    assertTrue(output.contains("skipping"))
+  }
+
+  @Test
+  def testDumpMetadataSnapshot(): Unit = {
+    val metadataRecords = Seq(
+      new ApiMessageAndVersion(
+        new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(10), 
0.toShort),
+      new ApiMessageAndVersion(
+        new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(20), 
0.toShort),
+      new ApiMessageAndVersion(
+        new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()), 
0.toShort),
+      new ApiMessageAndVersion(
+        new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
+          setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+    )
+
+    val metadataLog = KafkaMetadataLog(
+      KafkaRaftServer.MetadataPartition,
+      KafkaRaftServer.MetadataTopicId,
+      logDir,
+      time,
+      time.scheduler,
+      MetadataLogConfig(
+        logSegmentBytes = 100 * 1024,
+        logSegmentMinBytes = 100 * 1024,
+        logSegmentMillis = 10 * 1000,
+        retentionMaxBytes = 100 * 1024,
+        retentionMillis = 60 * 1000,
+        maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+        maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+        fileDeleteDelayMs = Defaults.FileDeleteDelayMs,
+        nodeId = 1
+      )
+    )
+
+    val lastContainedLogTimestamp = 10000
+
+    val snapshotWriter = RecordsSnapshotWriter.createWithHeader(
+      () => metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)),
+      1024,
+      MemoryPool.NONE,
+      new MockTime,
+      lastContainedLogTimestamp,
+      CompressionType.NONE,
+      new MetadataRecordSerde
+    ).get()
+
+    snapshotWriter.append(metadataRecords.asJava)
+    snapshotWriter.freeze()
+    snapshotWriter.close()
+
+    var output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"false", "--files", snapshotPath))
+    assertTrue(output.contains("TOPIC_RECORD"))
+    assertTrue(output.contains("BROKER_RECORD"))
+    assertTrue(output.contains("SnapshotHeader"))
+    assertTrue(output.contains("SnapshotFooter"))
+    assertTrue(output.contains(s"lastContainedLogTimestamp: 
$lastContainedLogTimestamp"))
+
+    output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"--skip-record-metadata", "false", "--files", snapshotPath))

Review Comment:
   Do you mind if you include these two `output` in a comment in this PR? I am 
interested to see the full output of this command against a snapshot.



##########
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##########
@@ -273,9 +277,73 @@ class DumpLogSegmentsTest {
     log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE, 
records:_*), leaderEpoch = 2)
 
     output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"--skip-record-metadata", "false", "--files", logFilePath))
-    assert(output.contains("TOPIC_RECORD"))
-    assert(output.contains("BROKER_RECORD"))
-    assert(output.contains("skipping"))
+    assertTrue(output.contains("TOPIC_RECORD"))
+    assertTrue(output.contains("BROKER_RECORD"))
+    assertTrue(output.contains("skipping"))
+  }
+
+  @Test
+  def testDumpMetadataSnapshot(): Unit = {
+    val metadataRecords = Seq(
+      new ApiMessageAndVersion(
+        new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(10), 
0.toShort),
+      new ApiMessageAndVersion(
+        new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(20), 
0.toShort),
+      new ApiMessageAndVersion(
+        new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()), 
0.toShort),
+      new ApiMessageAndVersion(
+        new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
+          setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+    )
+
+    val metadataLog = KafkaMetadataLog(
+      KafkaRaftServer.MetadataPartition,
+      KafkaRaftServer.MetadataTopicId,
+      logDir,
+      time,
+      time.scheduler,
+      MetadataLogConfig(
+        logSegmentBytes = 100 * 1024,
+        logSegmentMinBytes = 100 * 1024,
+        logSegmentMillis = 10 * 1000,
+        retentionMaxBytes = 100 * 1024,
+        retentionMillis = 60 * 1000,
+        maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+        maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+        fileDeleteDelayMs = Defaults.FileDeleteDelayMs,
+        nodeId = 1
+      )
+    )
+
+    val lastContainedLogTimestamp = 10000
+
+    val snapshotWriter = RecordsSnapshotWriter.createWithHeader(
+      () => metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)),
+      1024,
+      MemoryPool.NONE,
+      new MockTime,
+      lastContainedLogTimestamp,
+      CompressionType.NONE,
+      new MetadataRecordSerde
+    ).get()
+
+    snapshotWriter.append(metadataRecords.asJava)
+    snapshotWriter.freeze()
+    snapshotWriter.close()
+
+    var output = runDumpLogSegments(Array("--cluster-metadata-decoder", 
"false", "--files", snapshotPath))

Review Comment:
   What is `false` in the command line argument? I assume that it is a 
positional argument but I don't see where that is defined in 
`DumpLogSegmentsOption`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to