hachikuji commented on a change in pull request #10212:
URL: https://github.com/apache/kafka/pull/10212#discussion_r583326960



##########
File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala
##########
@@ -242,7 +246,8 @@ object DumpLogSegments {
                       nonConsecutivePairsForLogFilesMap: mutable.Map[String, 
List[(Long, Long)]],
                       isDeepIteration: Boolean,
                       maxMessageSize: Int,
-                      parser: MessageParser[_, _]): Unit = {
+                      parser: MessageParser[_, _],
+                      skipBatchMetadata: Boolean): Unit = {

Review comment:
       Should we skip `printBatchLevel` below if `skipBatchMetadata` is set?

##########
File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala
##########
@@ -425,9 +461,11 @@ object DumpLogSegments {
     lazy val shouldPrintDataLog: Boolean = options.has(printOpt) ||
       options.has(offsetsOpt) ||
       options.has(transactionLogOpt) ||
+      options.has(clusterMetadataOpt) ||
       options.has(valueDecoderOpt) ||
       options.has(keyDecoderOpt)
 
+    lazy val skipBatchMetadata = options.has(skipBatchMetadataOpt)

Review comment:
       I think this option only makes sense if a parser is defined? Might be 
worth giving an error to the user.

##########
File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala
##########
@@ -263,29 +268,32 @@ object DumpLogSegments {
             }
             lastOffset = record.offset
 
-            print(s"$RecordIndent offset: ${record.offset} isValid: 
${record.isValid} crc: ${record.checksumOrNull}" +
-                s" keySize: ${record.keySize} valueSize: ${record.valueSize} 
${batch.timestampType}: ${record.timestamp}" +
-                s" baseOffset: ${batch.baseOffset} lastOffset: 
${batch.lastOffset} baseSequence: ${batch.baseSequence}" +
-                s" lastSequence: ${batch.lastSequence} producerEpoch: 
${batch.producerEpoch} partitionLeaderEpoch: ${batch.partitionLeaderEpoch}" +
-                s" batchSize: ${batch.sizeInBytes} magic: ${batch.magic} 
compressType: ${batch.compressionType} position: ${validBytes}")
-
-
-            if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
-              print(" sequence: " + record.sequence + " headerKeys: " + 
record.headers.map(_.key).mkString("[", ",", "]"))
-            } else {
-              print(s" crc: ${record.checksumOrNull} isvalid: 
${record.isValid}")
-            }
+            print(s"$RecordIndent ")

Review comment:
       If we do skip the call to `printBatchLevel`, it probably also makes 
sense to skip this when `skipBatchMetadata` is enabled.

##########
File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala
##########
@@ -382,6 +390,30 @@ object DumpLogSegments {
     }
   }
 
+  val metadataRecordSerde = new MetadataRecordSerde()

Review comment:
       nit: may as well move this into `ClusterMetadataLogMessageParser` since 
we don't need it anywhere else




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to