[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r541240558 ## File path: generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java ## @@ -377,11 +387,24 @@ private void generateVariableLengthTargetToJson(Target target, Versions versions headerGenerator.addImport(MessageGenerator.ARRAYS_CLASS); buffer.printf("%s;%n", target.assignmentStatement( String.format("new BinaryNode(Arrays.copyOf(%s, %s.length))", -target.sourceVariable(), target.sourceVariable(; +target.sourceVariable(), target.sourceVariable(; } } else if (target.field().type().isRecords()) { headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS); +headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS); +// KIP-673: When logging requests/responses, we do not serialize the record, instead we +// output its sizeInBytes, because outputting the bytes is not very useful and can be +// quite expensive. Otherwise, we will serialize the record. +buffer.printf("if (_serializeRecords) {%n"); +buffer.incrementIndent(); buffer.printf("%s;%n", target.assignmentStatement("new BinaryNode(new byte[]{})")); +buffer.decrementIndent(); +buffer.printf("} else {%n"); +buffer.incrementIndent(); +buffer.printf("%s;%n", target.assignmentStatement( Review comment: I do see the issue of it not being super clear, but I don't think we can change the field name from here. Doing `%sSizeInBytes` would just add the name at the end of the line which would result in a compilation error. Unless you mean to change the field name in the `.json` file, but it would change the name for both the serialize and non-serialize case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r526511067 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data, request.version) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data, request.version) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data, request.version) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data, request.version) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data, request.version) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r522472868 ## File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala ## @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.net.InetAddress +import java.nio.ByteBuffer +import java.util +import java.util.{Collections, Optional} + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode +import kafka.network +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.memory.MemoryPool +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.{ClientInformation, ListenerName, NetworkSend} +import org.junit.Test +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} +import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest.PartitionData +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} +import org.easymock.EasyMock.createNiceMock +import org.junit.Assert.assertEquals + +import scala.collection.mutable.ArrayBuffer + +class RequestConvertToJsonTest { + + @Test + def testAllRequestTypesHandled(): Unit = { +val unhandledKeys = ArrayBuffer[String]() +ApiKeys.values().foreach(key => { + val version: Short = key.latestVersion() + var req: AbstractRequest = null + if (key == ApiKeys.PRODUCE) { +// There's inconsistency with the toStruct schema in ProduceRequest +// and ProduceRequestDataJsonConverters where the field names don't +// match so the struct does not have the correct field names. This is +// a temporary workaround until ProduceRequest starts using ProduceRequestData +req = ProduceRequest.Builder.forCurrentMagic(0.toShort, 1, new util.HashMap[TopicPartition, MemoryRecords]).build() + } else { +val struct = ApiMessageType.fromApiKey(key.id).newRequest().toStruct(version) +req = AbstractRequest.parseRequest(key, version, struct) + } + try { +RequestConvertToJson.request(req, false) + } catch { +case _ : IllegalStateException => unhandledKeys += key.toString + } +}) +assertEquals("Unhandled request keys", ArrayBuffer.empty, unhandledKeys) + } + + @Test + def testAllResponseTypesHandled(): Unit = { +val unhandledKeys = ArrayBuffer[String]() +ApiKeys.values().foreach(key => { + val version: Short = key.latestVersion() + val struct = ApiMessageType.fromApiKey(key.id).newResponse().toStruct(version) + val res = AbstractResponse.parseResponse(key, struct, version) + try { +RequestConvertToJson.response(res, version, false) + } catch { +case _ : IllegalStateException => unhandledKeys += key.toString + } +}) +assertEquals("Unhandled response keys", ArrayBuffer.empty, unhandledKeys) + } + + @Test + def testFormatOfOffsetsForLeaderEpochRequestNode(): Unit = { +val partitionDataMap = new util.HashMap[TopicPartition, PartitionData] +partitionDataMap.put(new TopicPartition("topic1", 0), new PartitionData(Optional.of(0), 1)) + +val version: Short = ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion +val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(partitionDataMap).build(version) +val actualNode = RequestConvertToJson.request(request, true) + +val requestData = OffsetForLeaderEpochRequestDataJsonConverter.read(actualNode, version) +val expectedNode = OffsetForLeaderEpochRequestDataJsonConverter.write(requestData, version, true) + +assertEquals(expectedNode, actualNode) + } + + @Test + def testFormatOfProduceRequestNode(): Unit = { +val produceDataMap = new util.HashMap[TopicPartition, MemoryRecords] + +val version: Short = ApiKeys.PRODUCE.latestVersion +val serializeRecords: Boolean = false; +val request = ProduceRequest.Builder.forMagic(2, 0.toShort, 0, produceDataMap, "").build() +val actualNode = RequestConvertToJson.request(request,
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r520807921 ## File path: generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java ## @@ -375,13 +385,36 @@ private void generateVariableLengthTargetToJson(Target target, Versions versions target.sourceVariable(; } else { headerGenerator.addImport(MessageGenerator.ARRAYS_CLASS); +headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS); +buffer.printf("if (_serializeRecords) {%n"); Review comment: Yea, this change is because ProduceRequest uses `bytes` type. It would be worthwhile if we waited until we migrated the produce request to the automated protocol so we wouldn't have to change this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r518363490 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data, request.version) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data, request.version) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data, request.version) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data, request.version) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data, request.version) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r518359188 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version, verbose) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version, verbose) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data, request.version, verbose) + case req: DescribeLogDirsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517699783 ## File path: generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java ## @@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target target, Versions versions target.sourceVariable(), target.sourceVariable(; } } else if (target.field().type().isRecords()) { -headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS); -buffer.printf("%s;%n", target.assignmentStatement("new BinaryNode(new byte[]{})")); +headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS); +buffer.printf("%s;%n", target.assignmentStatement( +String.format("new IntNode(%s.sizeInBytes())", target.sourceVariable(; Review comment: Nvm, we can't deserialize `recordSet` to an IntNode because `FetchResponseData` expects `recordSet` to be of type `BaseRecords`, not `int`. What I've done was left deserialization the same and added the verbose tag to serialization. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517651742 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { Review comment: Just read through the comment below, so I'll keep the `verbose` flag for now, but I'll remove it depending on what we decide below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517665383 ## File path: generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java ## @@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target target, Versions versions target.sourceVariable(), target.sourceVariable(; } } else if (target.field().type().isRecords()) { -headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS); -buffer.printf("%s;%n", target.assignmentStatement("new BinaryNode(new byte[]{})")); +headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS); +buffer.printf("%s;%n", target.assignmentStatement( +String.format("new IntNode(%s.sizeInBytes())", target.sourceVariable(; Review comment: @lbradstreet The current generated JSON does not print the recordSet either. But when we are serializing a BinaryNode with an empty array, it is still being deserialized as a BinaryNode, so it doesn't break anything. ``` buffer.printf("%s;%n", target.assignmentStatement( String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s, \"%s\")))", target.sourceVariable(), target.humanReadableName(; ``` So I believe the concern is that I had proposed to change the serialization to an IntNode, but the deserialization still expects a BinaryNode which I should have also changed. Another alternative I could fix this by changing the deserialization to expect an IntNode, but I'm for adding a verbose flag This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517665383 ## File path: generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java ## @@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target target, Versions versions target.sourceVariable(), target.sourceVariable(; } } else if (target.field().type().isRecords()) { -headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS); -buffer.printf("%s;%n", target.assignmentStatement("new BinaryNode(new byte[]{})")); +headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS); +buffer.printf("%s;%n", target.assignmentStatement( +String.format("new IntNode(%s.sizeInBytes())", target.sourceVariable(; Review comment: @lbradstreet The current generated JSON does not print the recordSet either. But when we are serializing a BinaryNode with an empty array, it is still being deserialized as a BinaryNode, so it doesn't break anything. ``` buffer.printf("%s;%n", target.assignmentStatement( String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s, \"%s\")))", target.sourceVariable(), target.humanReadableName(; ``` So I believe the concern is that I had proposed to change the serialization to an IntNode, but the deserialization still expects a BinaryNode which I should have also changed. Another alternative I could fix this by changing the deserialization to expect an IntNode, but I'm for adding a verbose flag This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517665383 ## File path: generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java ## @@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target target, Versions versions target.sourceVariable(), target.sourceVariable(; } } else if (target.field().type().isRecords()) { -headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS); -buffer.printf("%s;%n", target.assignmentStatement("new BinaryNode(new byte[]{})")); +headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS); +buffer.printf("%s;%n", target.assignmentStatement( +String.format("new IntNode(%s.sizeInBytes())", target.sourceVariable(; Review comment: @lbradstreet The current generated JSON does not print the recordSet. When we were serializing a BinaryNode with an empty array, it was still being deserialized as a BinaryNode, so it doesn't break anything. ``` buffer.printf("%s;%n", target.assignmentStatement( String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s, \"%s\")))", target.sourceVariable(), target.humanReadableName(; ``` So I believe the concern is that I had proposed to change the serialization to an IntNode, but the deserialization still expects a BinaryNode which I should have also changed. Another alternative I could fix this by changing the deserialization to expect an IntNode, but I'm for adding a verbose flag This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517651742 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { Review comment: Just read through the comment below, so I'll keep it for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517649219 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data, request.version) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, request.version) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, request.version) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data, request.version) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data, request.version) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data, request.version) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data, request.version) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r517645718 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.network.ClientInformation +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { Review comment: Yes, I kept it because `requestDescMetrics` originally had the `detailsEnabled`, but I'll remove it since none of the JsonConverters differentiates between verbose and non-verbose. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r516288558 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514882473 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514872477 ## File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala ## @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util.HashMap + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message._ +import org.junit.Test +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests._ + +import scala.collection.mutable.ArrayBuffer + +class RequestConvertToJsonTest { + + def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): AbstractRequest = apiKey match { +case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 1, new HashMap[TopicPartition, MemoryRecords]()).build() +case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version) +case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new ListOffsetRequestData().toStruct(version), version) +case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), version) +case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new OffsetCommitRequestData(), version) +case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new OffsetFetchRequestData().toStruct(version), version) +case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new FindCoordinatorRequestData().toStruct(version), version) +case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new JoinGroupRequestData(), version) +case ApiKeys.HEARTBEAT => new HeartbeatRequest(new HeartbeatRequestData().toStruct(version), version) +case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new LeaveGroupRequestData().toStruct(version), version) Review comment: Actually, I found a workaround for this mapping and removed it all together, so there'd be one less apikey mapping to maintain. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514619891 ## File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala ## @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util.HashMap + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message._ +import org.junit.Test +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests._ + +import scala.collection.mutable.ArrayBuffer + +class RequestConvertToJsonTest { + + def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): AbstractRequest = apiKey match { +case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 1, new HashMap[TopicPartition, MemoryRecords]()).build() +case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version) +case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new ListOffsetRequestData().toStruct(version), version) +case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), version) +case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new OffsetCommitRequestData(), version) +case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new OffsetFetchRequestData().toStruct(version), version) +case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new FindCoordinatorRequestData().toStruct(version), version) +case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new JoinGroupRequestData(), version) +case ApiKeys.HEARTBEAT => new HeartbeatRequest(new HeartbeatRequestData().toStruct(version), version) +case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new LeaveGroupRequestData().toStruct(version), version) Review comment: I guess this is related with the inconsistency issue, but I tried not to make too many changes. The constructors with the `data` parameters were private in some cases, so I used the `toStruct`. But for consistency, I'll change to use `toStruct` instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514619891 ## File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala ## @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util.HashMap + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message._ +import org.junit.Test +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests._ + +import scala.collection.mutable.ArrayBuffer + +class RequestConvertToJsonTest { + + def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): AbstractRequest = apiKey match { +case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 1, new HashMap[TopicPartition, MemoryRecords]()).build() +case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version) +case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new ListOffsetRequestData().toStruct(version), version) +case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), version) +case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new OffsetCommitRequestData(), version) +case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new OffsetFetchRequestData().toStruct(version), version) +case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new FindCoordinatorRequestData().toStruct(version), version) +case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new JoinGroupRequestData(), version) +case ApiKeys.HEARTBEAT => new HeartbeatRequest(new HeartbeatRequestData().toStruct(version), version) +case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new LeaveGroupRequestData().toStruct(version), version) Review comment: I guess this is related with the inconsistency issue, but I tried not to make too many changes. The constructors with the `data` parameters were private in some cases, so I used the `toStruct`. But for consistency, I'll change to use the Builder class instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514861991 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514861389 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514636997 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514636692 ## File path: core/src/main/scala/kafka/network/RequestChannel.scala ## @@ -162,7 +161,7 @@ object RequestChannel extends Logging { } } -trace(s"Processor $processor received request: ${requestDesc(true)}") +trace(s"Processor $processor received request: ${RequestConvertToJson.requestDesc(header, loggableRequest, true).toString}") Review comment: The macro does check `isTraceEnabled`, and this check is done before the string is evaluated, so it wouldn't be unnecessarily computed. But I'm also fine with gating it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514621561 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514619891 ## File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala ## @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util.HashMap + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message._ +import org.junit.Test +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests._ + +import scala.collection.mutable.ArrayBuffer + +class RequestConvertToJsonTest { + + def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): AbstractRequest = apiKey match { +case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 1, new HashMap[TopicPartition, MemoryRecords]()).build() +case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version) +case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new ListOffsetRequestData().toStruct(version), version) +case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), version) +case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new OffsetCommitRequestData(), version) +case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new OffsetFetchRequestData().toStruct(version), version) +case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new FindCoordinatorRequestData().toStruct(version), version) +case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new JoinGroupRequestData(), version) +case ApiKeys.HEARTBEAT => new HeartbeatRequest(new HeartbeatRequestData().toStruct(version), version) +case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new LeaveGroupRequestData().toStruct(version), version) Review comment: I guess this is related with the consistency issue, but I tried not to make too many changes. The constructors with the `data` parameters were private in some cases, so I used the `toStruct`. But for consistency, I'll change to use one or the other rather than alternate between the two. ## File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala ## @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util.HashMap + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message._ +import org.junit.Test +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests._ + +import scala.collection.mutable.ArrayBuffer + +class RequestConvertToJsonTest { + + def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): AbstractRequest = apiKey match { +case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 1, new HashMap[TopicPartition, MemoryRecords]()).build() +case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version) +case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new ListOffsetRequestData().toStruct(version), version) +case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), version) +case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new OffsetCommitRequestData(), version) +case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514617626 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514614216 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514610499 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514508755 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java ## @@ -57,6 +57,10 @@ public AddOffsetsToTxnRequest(Struct struct, short version) { this.data = new AddOffsetsToTxnRequestData(struct, version); } +public AddOffsetsToTxnRequestData data() { +return data; +} Review comment: Yea I definitely agree with wanting consistency. I'll stick to 1) in the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org