[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-12-11 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r541240558



##
File path: 
generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
##
@@ -377,11 +387,24 @@ private void generateVariableLengthTargetToJson(Target 
target, Versions versions
 headerGenerator.addImport(MessageGenerator.ARRAYS_CLASS);
 buffer.printf("%s;%n", target.assignmentStatement(
 String.format("new BinaryNode(Arrays.copyOf(%s, 
%s.length))",
-target.sourceVariable(), target.sourceVariable(;
+target.sourceVariable(), 
target.sourceVariable(;
 }
 } else if (target.field().type().isRecords()) {
 headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
+headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
+// KIP-673: When logging requests/responses, we do not serialize 
the record, instead we
+// output its sizeInBytes, because outputting the bytes is not 
very useful and can be
+// quite expensive. Otherwise, we will serialize the record.
+buffer.printf("if (_serializeRecords) {%n");
+buffer.incrementIndent();
 buffer.printf("%s;%n", target.assignmentStatement("new 
BinaryNode(new byte[]{})"));
+buffer.decrementIndent();
+buffer.printf("} else {%n");
+buffer.incrementIndent();
+buffer.printf("%s;%n", target.assignmentStatement(

Review comment:
   I do see the issue of it not being super clear, but I don't think we can 
change the field name from here. Doing `%sSizeInBytes` would just add the name 
at the end of the line which would result in a compilation error. Unless you 
mean to change the field name in the `.json` file, but it would change the name 
for both the serialize and non-serialize case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-18 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r526511067



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, 
request.version)
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version)
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, 
request.version)
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data, request.version)
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version)
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-12 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r522472868



##
File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
##
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.net.InetAddress
+import java.nio.ByteBuffer
+import java.util
+import java.util.{Collections, Optional}
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.ObjectNode
+import kafka.network
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.{ClientInformation, ListenerName, 
NetworkSend}
+import org.junit.Test
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
+import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest.PartitionData
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.easymock.EasyMock.createNiceMock
+import org.junit.Assert.assertEquals
+
+import scala.collection.mutable.ArrayBuffer
+
+class RequestConvertToJsonTest {
+
+  @Test
+  def testAllRequestTypesHandled(): Unit = {
+val unhandledKeys = ArrayBuffer[String]()
+ApiKeys.values().foreach(key => {
+  val version: Short = key.latestVersion()
+  var req: AbstractRequest = null
+  if (key == ApiKeys.PRODUCE) {
+// There's inconsistency with the toStruct schema in ProduceRequest
+// and ProduceRequestDataJsonConverters where the field names don't
+// match so the struct does not have the correct field names. This is
+// a temporary workaround until ProduceRequest starts using 
ProduceRequestData
+req = ProduceRequest.Builder.forCurrentMagic(0.toShort, 1, new 
util.HashMap[TopicPartition, MemoryRecords]).build()
+  } else {
+val struct = 
ApiMessageType.fromApiKey(key.id).newRequest().toStruct(version)
+req = AbstractRequest.parseRequest(key, version, struct)
+  }
+  try {
+RequestConvertToJson.request(req, false)
+  } catch {
+case _ : IllegalStateException => unhandledKeys += key.toString
+  }
+})
+assertEquals("Unhandled request keys", ArrayBuffer.empty, unhandledKeys)
+  }
+
+  @Test
+  def testAllResponseTypesHandled(): Unit = {
+val unhandledKeys = ArrayBuffer[String]()
+ApiKeys.values().foreach(key => {
+  val version: Short = key.latestVersion()
+  val struct = 
ApiMessageType.fromApiKey(key.id).newResponse().toStruct(version)
+  val res = AbstractResponse.parseResponse(key, struct, version)
+  try {
+RequestConvertToJson.response(res, version, false)
+  } catch {
+case _ : IllegalStateException => unhandledKeys += key.toString
+  }
+})
+assertEquals("Unhandled response keys", ArrayBuffer.empty, unhandledKeys)
+  }
+
+  @Test
+  def testFormatOfOffsetsForLeaderEpochRequestNode(): Unit = {
+val partitionDataMap = new util.HashMap[TopicPartition, PartitionData]
+partitionDataMap.put(new TopicPartition("topic1", 0), new 
PartitionData(Optional.of(0),  1))
+
+val version: Short = ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion
+val request = 
OffsetsForLeaderEpochRequest.Builder.forConsumer(partitionDataMap).build(version)
+val actualNode = RequestConvertToJson.request(request, true)
+
+val requestData = 
OffsetForLeaderEpochRequestDataJsonConverter.read(actualNode, version)
+val expectedNode = 
OffsetForLeaderEpochRequestDataJsonConverter.write(requestData, version, true)
+
+assertEquals(expectedNode, actualNode)
+  }
+
+  @Test
+  def testFormatOfProduceRequestNode(): Unit = {
+val produceDataMap = new util.HashMap[TopicPartition, MemoryRecords]
+
+val version: Short = ApiKeys.PRODUCE.latestVersion
+val serializeRecords: Boolean = false;
+val request = ProduceRequest.Builder.forMagic(2, 0.toShort, 0, 
produceDataMap, "").build()
+val actualNode = RequestConvertToJson.request(request, 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-10 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r520807921



##
File path: 
generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
##
@@ -375,13 +385,36 @@ private void generateVariableLengthTargetToJson(Target 
target, Versions versions
 target.sourceVariable(;
 } else {
 headerGenerator.addImport(MessageGenerator.ARRAYS_CLASS);
+headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
+buffer.printf("if (_serializeRecords) {%n");

Review comment:
   Yea, this change is because ProduceRequest uses `bytes` type. It would 
be worthwhile if we waited until we migrated the produce request to the 
automated protocol so we wouldn't have to change this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-05 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r518363490



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, 
request.version)
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version)
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, 
request.version)
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data, request.version)
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version)
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-05 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r518359188



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, 
request.version, verbose)
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, 
request.version, verbose)
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data, request.version, 
verbose)
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data, 
request.version, verbose)
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data, request.version, verbose)
+  case req: DescribeLogDirsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517699783



##
File path: 
generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
##
@@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target 
target, Versions versions
 target.sourceVariable(), target.sourceVariable(;
 }
 } else if (target.field().type().isRecords()) {
-headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
-buffer.printf("%s;%n", target.assignmentStatement("new 
BinaryNode(new byte[]{})"));
+headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
+buffer.printf("%s;%n", target.assignmentStatement(
+String.format("new IntNode(%s.sizeInBytes())", 
target.sourceVariable(;

Review comment:
   Nvm, we can't deserialize `recordSet` to an IntNode because 
`FetchResponseData` expects `recordSet` to be of type `BaseRecords`, not `int`.
   What I've done was left deserialization the same and added the verbose tag 
to serialization.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517651742



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {

Review comment:
   Just read through the comment below, so I'll keep the `verbose` flag for 
now, but I'll remove it depending on what we decide below.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517665383



##
File path: 
generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
##
@@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target 
target, Versions versions
 target.sourceVariable(), target.sourceVariable(;
 }
 } else if (target.field().type().isRecords()) {
-headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
-buffer.printf("%s;%n", target.assignmentStatement("new 
BinaryNode(new byte[]{})"));
+headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
+buffer.printf("%s;%n", target.assignmentStatement(
+String.format("new IntNode(%s.sizeInBytes())", 
target.sourceVariable(;

Review comment:
   @lbradstreet The current generated JSON does not print the recordSet 
either. But when we are serializing a BinaryNode with an empty array, it is 
still being deserialized as a BinaryNode, so it doesn't break anything.
   ```
   buffer.printf("%s;%n", target.assignmentStatement(
   
String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s,
 \"%s\")))",
   target.sourceVariable(), target.humanReadableName(;
   ```
   So I believe the concern is that I had proposed to change the serialization 
to an IntNode, but the deserialization still expects a BinaryNode which I 
should have also changed.
   Another alternative I could fix this by changing the deserialization to 
expect an IntNode, but I'm for adding a verbose flag
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517665383



##
File path: 
generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
##
@@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target 
target, Versions versions
 target.sourceVariable(), target.sourceVariable(;
 }
 } else if (target.field().type().isRecords()) {
-headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
-buffer.printf("%s;%n", target.assignmentStatement("new 
BinaryNode(new byte[]{})"));
+headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
+buffer.printf("%s;%n", target.assignmentStatement(
+String.format("new IntNode(%s.sizeInBytes())", 
target.sourceVariable(;

Review comment:
   @lbradstreet The current generated JSON does not print the recordSet 
either. But when we are serializing a BinaryNode with an empty array, it is 
still being deserialized as a BinaryNode, so it doesn't break anything.
   ```
   buffer.printf("%s;%n", target.assignmentStatement(
   
String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s,
 \"%s\")))",
   target.sourceVariable(), target.humanReadableName(;
   ```
   So I believe the concern is that I had proposed to change the serialization 
to an IntNode, but the deserialization still expects a BinaryNode which I 
should have also changed.
   Another alternative I could fix this by changing the deserialization to 
expect an IntNode, but I'm for adding a verbose flag
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517665383



##
File path: 
generator/src/main/java/org/apache/kafka/message/JsonConverterGenerator.java
##
@@ -380,8 +380,9 @@ private void generateVariableLengthTargetToJson(Target 
target, Versions versions
 target.sourceVariable(), target.sourceVariable(;
 }
 } else if (target.field().type().isRecords()) {
-headerGenerator.addImport(MessageGenerator.BINARY_NODE_CLASS);
-buffer.printf("%s;%n", target.assignmentStatement("new 
BinaryNode(new byte[]{})"));
+headerGenerator.addImport(MessageGenerator.INT_NODE_CLASS);
+buffer.printf("%s;%n", target.assignmentStatement(
+String.format("new IntNode(%s.sizeInBytes())", 
target.sourceVariable(;

Review comment:
   @lbradstreet The current generated JSON does not print the recordSet. 
When we were serializing a BinaryNode with an empty array, it was still being 
deserialized as a BinaryNode, so it doesn't break anything.
   ```
   buffer.printf("%s;%n", target.assignmentStatement(
   
String.format("MemoryRecords.readableRecords(ByteBuffer.wrap(MessageUtil.jsonNodeToBinary(%s,
 \"%s\")))",
   target.sourceVariable(), target.humanReadableName(;
   ```
   So I believe the concern is that I had proposed to change the serialization 
to an IntNode, but the deserialization still expects a BinaryNode which I 
should have also changed.
   Another alternative I could fix this by changing the deserialization to 
expect an IntNode, but I'm for adding a verbose flag
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517651742



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {

Review comment:
   Just read through the comment below, so I'll keep it for now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517649219



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data, request.version)
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, 
request.version)
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version)
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, 
request.version)
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data, request.version)
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data, request.version)
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version)
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version)
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-04 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r517645718



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, BinaryNode, DoubleNode, 
IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {

Review comment:
   Yes, I kept it because `requestDescMetrics` originally had the 
`detailsEnabled`, but I'll remove it since none of the JsonConverters 
differentiates between verbose and non-verbose.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-03 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r516288558



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514882473



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514872477



##
File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
##
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.HashMap
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message._
+import org.junit.Test
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests._
+
+import scala.collection.mutable.ArrayBuffer
+
+class RequestConvertToJsonTest {
+
+  def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): 
AbstractRequest = apiKey match {
+case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 
1, new HashMap[TopicPartition, MemoryRecords]()).build()
+case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version)
+case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new 
ListOffsetRequestData().toStruct(version), version)
+case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), 
version)
+case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new 
OffsetCommitRequestData(), version)
+case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new 
OffsetFetchRequestData().toStruct(version), version)
+case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new 
FindCoordinatorRequestData().toStruct(version), version)
+case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new 
JoinGroupRequestData(), version)
+case ApiKeys.HEARTBEAT => new HeartbeatRequest(new 
HeartbeatRequestData().toStruct(version), version)
+case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new 
LeaveGroupRequestData().toStruct(version), version)

Review comment:
   Actually, I found a workaround for this mapping and removed it all 
together, so there'd be one less apikey mapping to maintain.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514619891



##
File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
##
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.HashMap
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message._
+import org.junit.Test
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests._
+
+import scala.collection.mutable.ArrayBuffer
+
+class RequestConvertToJsonTest {
+
+  def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): 
AbstractRequest = apiKey match {
+case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 
1, new HashMap[TopicPartition, MemoryRecords]()).build()
+case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version)
+case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new 
ListOffsetRequestData().toStruct(version), version)
+case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), 
version)
+case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new 
OffsetCommitRequestData(), version)
+case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new 
OffsetFetchRequestData().toStruct(version), version)
+case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new 
FindCoordinatorRequestData().toStruct(version), version)
+case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new 
JoinGroupRequestData(), version)
+case ApiKeys.HEARTBEAT => new HeartbeatRequest(new 
HeartbeatRequestData().toStruct(version), version)
+case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new 
LeaveGroupRequestData().toStruct(version), version)

Review comment:
   I guess this is related with the inconsistency issue, but I tried not to 
make too many changes. The constructors with the `data` parameters were private 
in some cases, so I used the `toStruct`. But for consistency, I'll change to 
use `toStruct` instead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514619891



##
File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
##
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.HashMap
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message._
+import org.junit.Test
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests._
+
+import scala.collection.mutable.ArrayBuffer
+
+class RequestConvertToJsonTest {
+
+  def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): 
AbstractRequest = apiKey match {
+case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 
1, new HashMap[TopicPartition, MemoryRecords]()).build()
+case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version)
+case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new 
ListOffsetRequestData().toStruct(version), version)
+case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), 
version)
+case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new 
OffsetCommitRequestData(), version)
+case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new 
OffsetFetchRequestData().toStruct(version), version)
+case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new 
FindCoordinatorRequestData().toStruct(version), version)
+case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new 
JoinGroupRequestData(), version)
+case ApiKeys.HEARTBEAT => new HeartbeatRequest(new 
HeartbeatRequestData().toStruct(version), version)
+case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new 
LeaveGroupRequestData().toStruct(version), version)

Review comment:
   I guess this is related with the inconsistency issue, but I tried not to 
make too many changes. The constructors with the `data` parameters were private 
in some cases, so I used the `toStruct`. But for consistency, I'll change to 
use the Builder class instead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514861991



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514861389



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514636997



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514636692



##
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##
@@ -162,7 +161,7 @@ object RequestChannel extends Logging {
   }
 }
 
-trace(s"Processor $processor received request: ${requestDesc(true)}")
+trace(s"Processor $processor received request: 
${RequestConvertToJson.requestDesc(header, loggableRequest, true).toString}")

Review comment:
   The macro does check `isTraceEnabled`, and this check is done before the 
string is evaluated, so it wouldn't be unnecessarily computed. But I'm also 
fine with gating it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514621561



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514619891



##
File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
##
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.HashMap
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message._
+import org.junit.Test
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests._
+
+import scala.collection.mutable.ArrayBuffer
+
+class RequestConvertToJsonTest {
+
+  def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): 
AbstractRequest = apiKey match {
+case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 
1, new HashMap[TopicPartition, MemoryRecords]()).build()
+case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version)
+case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new 
ListOffsetRequestData().toStruct(version), version)
+case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), 
version)
+case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new 
OffsetCommitRequestData(), version)
+case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new 
OffsetFetchRequestData().toStruct(version), version)
+case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new 
FindCoordinatorRequestData().toStruct(version), version)
+case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new 
JoinGroupRequestData(), version)
+case ApiKeys.HEARTBEAT => new HeartbeatRequest(new 
HeartbeatRequestData().toStruct(version), version)
+case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new 
LeaveGroupRequestData().toStruct(version), version)

Review comment:
   I guess this is related with the consistency issue, but I tried not to 
make too many changes. The constructors with the `data` parameters were private 
in some cases, so I used the `toStruct`. But for consistency, I'll change to 
use one or the other rather than alternate between the two.

##
File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
##
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.HashMap
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message._
+import org.junit.Test
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests._
+
+import scala.collection.mutable.ArrayBuffer
+
+class RequestConvertToJsonTest {
+
+  def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): 
AbstractRequest = apiKey match {
+case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 
1, new HashMap[TopicPartition, MemoryRecords]()).build()
+case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version)
+case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new 
ListOffsetRequestData().toStruct(version), version)
+case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), 
version)
+case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new 
OffsetCommitRequestData(), version)
+case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514617626



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514614216



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514610499



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514508755



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
##
@@ -57,6 +57,10 @@ public AddOffsetsToTxnRequest(Struct struct, short version) {
 this.data = new AddOffsetsToTxnRequestData(struct, version);
 }
 
+public AddOffsetsToTxnRequestData data() {
+return data;
+}

Review comment:
   Yea I definitely agree with wanting consistency. I'll stick to 1) in the 
PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org