dengziming commented on code in PR #12084:
URL: https://github.com/apache/kafka/pull/12084#discussion_r882376319
##########
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##########
@@ -248,8 +248,10 @@ object DumpLogSegments {
parser: MessageParser[_, _],
skipRecordMetadata: Boolean,
maxBytes: Int): Unit = {
- val startOffset = file.getName.split("\\.")(0).toLong
- println("Starting offset: " + startOffset)
+ if (file.getName.endsWith(UnifiedLog.LogFileSuffix)) {
+ val startOffset = file.getName.split("\\.")(0).toLong
+ println("Starting offset: " + startOffset)
+ }
Review Comment:
Yes, good suggestion.
##########
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##########
@@ -273,9 +277,73 @@ class DumpLogSegmentsTest {
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
records:_*), leaderEpoch = 2)
output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"--skip-record-metadata", "false", "--files", logFilePath))
- assert(output.contains("TOPIC_RECORD"))
- assert(output.contains("BROKER_RECORD"))
- assert(output.contains("skipping"))
+ assertTrue(output.contains("TOPIC_RECORD"))
+ assertTrue(output.contains("BROKER_RECORD"))
+ assertTrue(output.contains("skipping"))
+ }
+
+ @Test
+ def testDumpMetadataSnapshot(): Unit = {
+ val metadataRecords = Seq(
+ new ApiMessageAndVersion(
+ new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(10),
0.toShort),
+ new ApiMessageAndVersion(
+ new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(20),
0.toShort),
+ new ApiMessageAndVersion(
+ new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()),
0.toShort),
+ new ApiMessageAndVersion(
+ new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
+ setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+ )
+
+ val metadataLog = KafkaMetadataLog(
+ KafkaRaftServer.MetadataPartition,
+ KafkaRaftServer.MetadataTopicId,
+ logDir,
+ time,
+ time.scheduler,
+ MetadataLogConfig(
+ logSegmentBytes = 100 * 1024,
+ logSegmentMinBytes = 100 * 1024,
+ logSegmentMillis = 10 * 1000,
+ retentionMaxBytes = 100 * 1024,
+ retentionMillis = 60 * 1000,
+ maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+ fileDeleteDelayMs = Defaults.FileDeleteDelayMs,
+ nodeId = 1
+ )
+ )
+
+ val lastContainedLogTimestamp = 10000
+
+ val snapshotWriter = RecordsSnapshotWriter.createWithHeader(
+ () => metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)),
+ 1024,
+ MemoryPool.NONE,
+ new MockTime,
+ lastContainedLogTimestamp,
+ CompressionType.NONE,
+ new MetadataRecordSerde
+ ).get()
+
+ snapshotWriter.append(metadataRecords.asJava)
+ snapshotWriter.freeze()
+ snapshotWriter.close()
+
+ var output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"false", "--files", snapshotPath))
+ assertTrue(output.contains("TOPIC_RECORD"))
+ assertTrue(output.contains("BROKER_RECORD"))
+ assertTrue(output.contains("SnapshotHeader"))
+ assertTrue(output.contains("SnapshotFooter"))
+ assertTrue(output.contains(s"lastContainedLogTimestamp:
$lastContainedLogTimestamp"))
+
+ output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"--skip-record-metadata", "false", "--files", snapshotPath))
Review Comment:
I added them in the description of this PR.
##########
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##########
@@ -273,9 +277,73 @@ class DumpLogSegmentsTest {
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
records:_*), leaderEpoch = 2)
output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"--skip-record-metadata", "false", "--files", logFilePath))
- assert(output.contains("TOPIC_RECORD"))
- assert(output.contains("BROKER_RECORD"))
- assert(output.contains("skipping"))
+ assertTrue(output.contains("TOPIC_RECORD"))
+ assertTrue(output.contains("BROKER_RECORD"))
+ assertTrue(output.contains("skipping"))
+ }
+
+ @Test
+ def testDumpMetadataSnapshot(): Unit = {
+ val metadataRecords = Seq(
+ new ApiMessageAndVersion(
+ new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(10),
0.toShort),
+ new ApiMessageAndVersion(
+ new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(20),
0.toShort),
+ new ApiMessageAndVersion(
+ new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()),
0.toShort),
+ new ApiMessageAndVersion(
+ new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
+ setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+ )
+
+ val metadataLog = KafkaMetadataLog(
+ KafkaRaftServer.MetadataPartition,
+ KafkaRaftServer.MetadataTopicId,
+ logDir,
+ time,
+ time.scheduler,
+ MetadataLogConfig(
+ logSegmentBytes = 100 * 1024,
+ logSegmentMinBytes = 100 * 1024,
+ logSegmentMillis = 10 * 1000,
+ retentionMaxBytes = 100 * 1024,
+ retentionMillis = 60 * 1000,
+ maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+ fileDeleteDelayMs = Defaults.FileDeleteDelayMs,
+ nodeId = 1
+ )
+ )
+
+ val lastContainedLogTimestamp = 10000
+
+ val snapshotWriter = RecordsSnapshotWriter.createWithHeader(
+ () => metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)),
+ 1024,
+ MemoryPool.NONE,
+ new MockTime,
+ lastContainedLogTimestamp,
+ CompressionType.NONE,
+ new MetadataRecordSerde
+ ).get()
+
+ snapshotWriter.append(metadataRecords.asJava)
+ snapshotWriter.freeze()
+ snapshotWriter.close()
Review Comment:
Good suggestion.
##########
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##########
@@ -288,6 +290,12 @@ object DumpLogSegments {
case ControlRecordType.ABORT | ControlRecordType.COMMIT =>
val endTxnMarker = EndTransactionMarker.deserialize(record)
print(s" endTxnMarker: ${endTxnMarker.controlType}
coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
+ case ControlRecordType.SNAPSHOT_HEADER =>
+ val header =
ControlRecordUtils.deserializedSnapshotHeaderRecord(record)
+ print(s" SnapshotHeader version: ${header.version()}
lastContainedLogTimestamp: ${header.lastContainedLogTimestamp()}")
+ case ControlRecordType.SNAPSHOT_FOOTER =>
+ val footer =
ControlRecordUtils.deserializedSnapshotFooterRecord(record)
+ print(s" SnapshotFooter version: ${footer.version()}")
Review Comment:
Good suggestion.
##########
core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala:
##########
@@ -273,9 +277,73 @@ class DumpLogSegmentsTest {
log.appendAsLeader(MemoryRecords.withRecords(CompressionType.NONE,
records:_*), leaderEpoch = 2)
output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"--skip-record-metadata", "false", "--files", logFilePath))
- assert(output.contains("TOPIC_RECORD"))
- assert(output.contains("BROKER_RECORD"))
- assert(output.contains("skipping"))
+ assertTrue(output.contains("TOPIC_RECORD"))
+ assertTrue(output.contains("BROKER_RECORD"))
+ assertTrue(output.contains("skipping"))
+ }
+
+ @Test
+ def testDumpMetadataSnapshot(): Unit = {
+ val metadataRecords = Seq(
+ new ApiMessageAndVersion(
+ new RegisterBrokerRecord().setBrokerId(0).setBrokerEpoch(10),
0.toShort),
+ new ApiMessageAndVersion(
+ new RegisterBrokerRecord().setBrokerId(1).setBrokerEpoch(20),
0.toShort),
+ new ApiMessageAndVersion(
+ new TopicRecord().setName("test-topic").setTopicId(Uuid.randomUuid()),
0.toShort),
+ new ApiMessageAndVersion(
+ new PartitionChangeRecord().setTopicId(Uuid.randomUuid()).setLeader(1).
+ setPartitionId(0).setIsr(util.Arrays.asList(0, 1, 2)), 0.toShort)
+ )
+
+ val metadataLog = KafkaMetadataLog(
+ KafkaRaftServer.MetadataPartition,
+ KafkaRaftServer.MetadataTopicId,
+ logDir,
+ time,
+ time.scheduler,
+ MetadataLogConfig(
+ logSegmentBytes = 100 * 1024,
+ logSegmentMinBytes = 100 * 1024,
+ logSegmentMillis = 10 * 1000,
+ retentionMaxBytes = 100 * 1024,
+ retentionMillis = 60 * 1000,
+ maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+ maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+ fileDeleteDelayMs = Defaults.FileDeleteDelayMs,
+ nodeId = 1
+ )
+ )
+
+ val lastContainedLogTimestamp = 10000
+
+ val snapshotWriter = RecordsSnapshotWriter.createWithHeader(
+ () => metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)),
+ 1024,
+ MemoryPool.NONE,
+ new MockTime,
+ lastContainedLogTimestamp,
+ CompressionType.NONE,
+ new MetadataRecordSerde
+ ).get()
+
+ snapshotWriter.append(metadataRecords.asJava)
+ snapshotWriter.freeze()
+ snapshotWriter.close()
+
+ var output = runDumpLogSegments(Array("--cluster-metadata-decoder",
"false", "--files", snapshotPath))
Review Comment:
I think we mistakenly use `false` as the value of
`--cluster-metadata-decoder`, I removed all the "false" in the test file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]