chia7712 commented on code in PR #18480: URL: https://github.com/apache/kafka/pull/18480#discussion_r1912502970
########## core/src/main/scala/kafka/tools/DumpLogSegments.scala: ########## @@ -691,34 +625,17 @@ object DumpLogSegments { class ShareGroupStateMessageParser extends MessageParser[String, String] { private val serde = new ShareCoordinatorRecordSerde() - private def prepareKey(message: Message, version: Short): String = { - val messageAsJson = message match { - case m: ShareSnapshotKey => - ShareSnapshotKeyJsonConverter.write(m, version) - case m: ShareUpdateKey => - ShareUpdateKeyJsonConverter.write(m, version) - case _ => throw new UnknownRecordTypeException(version) - } - - jsonString(messageAsJson, version) - } - - private def prepareValue(message: Message, version: Short): String = { - val messageAsJson = message match { - case m: ShareSnapshotValue => - ShareSnapshotValueJsonConverter.write(m, version) - case m: ShareUpdateValue => - ShareUpdateValueJsonConverter.write(m, version) - case _ => throw new IllegalStateException(s"Message value ${message.getClass.getSimpleName} is not supported.") - } - - jsonString(messageAsJson, version) + private def prepareKey(message: ApiMessage): String = { + val json = new ObjectNode(JsonNodeFactory.instance) + json.set("type", new TextNode(message.apiKey.toString)) + json.set("data", ShareCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message)) + json.toString } - private def jsonString(jsonNode: JsonNode, version: Short): String = { + private def prepareValue(message: ApiMessage, version: Short): String = { val json = new ObjectNode(JsonNodeFactory.instance) - json.set("type", new TextNode(version.toString)) - json.set("data", jsonNode) + json.set("version", new TextNode(version.toString)) Review Comment: I guess you will change other values' "type" to "version" in the follow-up? ########## core/src/main/scala/kafka/tools/DumpLogSegments.scala: ########## @@ -519,41 +483,11 @@ object DumpLogSegments { json } - private def prepareValue(message: Message, version: Short): String = { - val messageAsJson = message match { - case m: OffsetCommitValue => - OffsetCommitValueJsonConverter.write(m, version) - case m: GroupMetadataValue => - prepareGroupMetadataValue(m, version) - case m: ConsumerGroupMetadataValue => - ConsumerGroupMetadataValueJsonConverter.write(m, version) - case m: ConsumerGroupPartitionMetadataValue => - ConsumerGroupPartitionMetadataValueJsonConverter.write(m, version) - case m: ConsumerGroupMemberMetadataValue => - ConsumerGroupMemberMetadataValueJsonConverter.write(m, version) - case m: ConsumerGroupTargetAssignmentMetadataValue => - ConsumerGroupTargetAssignmentMetadataValueJsonConverter.write(m, version) - case m: ConsumerGroupTargetAssignmentMemberValue => - ConsumerGroupTargetAssignmentMemberValueJsonConverter.write(m, version) - case m: ConsumerGroupCurrentMemberAssignmentValue => - ConsumerGroupCurrentMemberAssignmentValueJsonConverter.write(m, version) - case m: ConsumerGroupRegularExpressionValue => - ConsumerGroupRegularExpressionValueJsonConverter.write(m, version) - case m: ShareGroupMetadataValue => - ShareGroupMetadataValueJsonConverter.write(m, version) - case m: ShareGroupPartitionMetadataValue => - ShareGroupPartitionMetadataValueJsonConverter.write(m, version) - case m: ShareGroupMemberMetadataValue => - ShareGroupMemberMetadataValueJsonConverter.write(m, version) - case m: ShareGroupTargetAssignmentMetadataValue => - ShareGroupTargetAssignmentMetadataValueJsonConverter.write(m, version) - case m: ShareGroupTargetAssignmentMemberValue => - ShareGroupTargetAssignmentMemberValueJsonConverter.write(m, version) - case m: ShareGroupCurrentMemberAssignmentValue => - ShareGroupCurrentMemberAssignmentValueJsonConverter.write(m, version) - case m: ShareGroupStatePartitionMetadataValue => - ShareGroupStatePartitionMetadataValueJsonConverter.write(m, version) - case _ => throw new IllegalStateException(s"Message value ${message.getClass.getSimpleName} is not supported.") + private def prepareValue(message: ApiMessage, version: Short): String = { + val messageAsJson = if (message.apiKey == GroupCoordinatorRecordType.GROUP_METADATA.id) { + prepareGroupMetadataValue(message.asInstanceOf[GroupMetadataValue], version) Review Comment: The generated code is unable to parse the subscription. Therefore, we need to implement specific handling for `GroupMetadataValue`. If `GroupCoordinatorRecordJsonConverters.writeRecordValueAsJson` is exclusively used by `DumpLogSegments`, this approach should be acceptable. However, if this function is utilized in other contexts, I am concerned that it might generate JSON output that differs from other tools or logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org