[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-18 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r526309208



##
File path: clients/src/main/resources/common/message/ProduceRequest.json
##
@@ -33,21 +33,21 @@
   "validVersions": "0-8",
   "flexibleVersions": "none",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "0+", "entityType": "transactionalId",
+{ "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "3+", "default": "null", "ignorable": true, "entityType": 
"transactionalId",

Review comment:
   The previous code probably relied on the range checking of the message 
format to imply support here. My point is that the request is doomed to fail if 
it holds transactional data and we drop the transactionalId. So we may as well 
fail fast.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-18 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r526260184



##
File path: clients/src/main/resources/common/message/ProduceRequest.json
##
@@ -33,21 +33,21 @@
   "validVersions": "0-8",
   "flexibleVersions": "none",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "0+", "entityType": "transactionalId",
+{ "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "3+", "default": "null", "ignorable": true, "entityType": 
"transactionalId",

Review comment:
   I don't think it should be ignorable. Transactional requests require 
this in order to authorize.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-18 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r526252595



##
File path: clients/src/main/resources/common/message/ProduceRequest.json
##
@@ -33,21 +33,21 @@
   "validVersions": "0-8",
   "flexibleVersions": "none",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "0+", "entityType": "transactionalId",
+{ "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "3+", "default": "null", "entityType": "transactionalId",

Review comment:
   Hmm I don't think it should be ignorable. The request would just fail if 
we drop it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-17 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r525586697



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -560,13 +562,24 @@ private void handleProduceResponse(ClientResponse 
response, Maphttps://issues.apache.org/jira/browse/KAFKA-10696

Review comment:
   nit: since we have the jira for tracking, can we remove the TODO? A few 
more of these in the PR.

##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##
@@ -100,16 +113,92 @@ public void produceResponseRecordErrorsTest() {
 ProduceResponse response = new ProduceResponse(responseData);
 Struct struct = response.toStruct(ver);
 assertEquals("Should use schema version " + ver, 
ApiKeys.PRODUCE.responseSchema(ver), struct.schema());
-ProduceResponse.PartitionResponse deserialized = new 
ProduceResponse(struct).responses().get(tp);
+ProduceResponse.PartitionResponse deserialized = new 
ProduceResponse(new ProduceResponseData(struct, ver)).responses().get(tp);
 if (ver >= 8) {
 assertEquals(1, deserialized.recordErrors.size());
 assertEquals(3, deserialized.recordErrors.get(0).batchIndex);
 assertEquals("Record error", 
deserialized.recordErrors.get(0).message);
 assertEquals("Produce failed", deserialized.errorMessage);
 } else {
 assertEquals(0, deserialized.recordErrors.size());
-assertEquals(null, deserialized.errorMessage);
+assertNull(deserialized.errorMessage);
 }
 }
 }
+
+/**
+ * the schema in this test is from previous code and the automatic 
protocol should be compatible to previous schema.
+ */
+@Test
+public void testCompatibility() {

Review comment:
   I think this test might be overkill. We haven't done anything like this 
for the other converted APIs. It's a little similar to 
`MessageTest.testRequestSchemas`, which was useful verifying the generated 
schemas when the message generator was being written. Soon `testRequestSchemas` 
will be redundant, so I guess we have to decide if we just trust the generator 
and our compatibility system tests or if we want some other canonical 
representation. Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-10 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r520740954



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
##
@@ -204,118 +75,78 @@ public ProduceResponse(Map responses) {
  * @param throttleTimeMs Time in milliseconds the response was throttled
  */
 public ProduceResponse(Map responses, 
int throttleTimeMs) {
-this.responses = responses;
-this.throttleTimeMs = throttleTimeMs;
+this(new ProduceResponseData()
+.setResponses(responses.entrySet()
+.stream()
+.collect(Collectors.groupingBy(e -> e.getKey().topic()))
+.entrySet()
+.stream()
+.map(topicData -> new 
ProduceResponseData.TopicProduceResponse()

Review comment:
   Oh, I was just emphasizing that it is a matter of taste. It's up to you 
if you agree or not.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-06 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r519079153



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##
@@ -194,7 +107,27 @@ private ProduceRequest build(short version, boolean 
validate) {
 ProduceRequest.validateRecords(version, records);
 }
 }
-return new ProduceRequest(version, acks, timeout, 
partitionRecords, transactionalId);
+
+List tpd = partitionRecords

Review comment:
   Right. I thought we might consider computing it just before the data 
gets cleared.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-06 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r518989756



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##
@@ -194,7 +107,27 @@ private ProduceRequest build(short version, boolean 
validate) {
 ProduceRequest.validateRecords(version, records);
 }
 }
-return new ProduceRequest(version, acks, timeout, 
partitionRecords, transactionalId);
+
+List tpd = partitionRecords

Review comment:
   We could probably also compute `partitionSizes` lazily. I think the 
broker is the only one that uses it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-06 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r518984064



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
##
@@ -204,118 +75,78 @@ public ProduceResponse(Map responses) {
  * @param throttleTimeMs Time in milliseconds the response was throttled
  */
 public ProduceResponse(Map responses, 
int throttleTimeMs) {
-this.responses = responses;
-this.throttleTimeMs = throttleTimeMs;
+this(new ProduceResponseData()
+.setResponses(responses.entrySet()
+.stream()
+.collect(Collectors.groupingBy(e -> e.getKey().topic()))
+.entrySet()
+.stream()
+.map(topicData -> new 
ProduceResponseData.TopicProduceResponse()

Review comment:
   Not required, but this would be easier to follow up if we had some 
helpers.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -560,13 +561,23 @@ private void handleProduceResponse(ClientResponse 
response, Map responses) {
  * @param throttleTimeMs Time in milliseconds the response was throttled
  */
 public ProduceResponse(Map responses, 
int throttleTimeMs) {
-this.responses = responses;
-this.throttleTimeMs = throttleTimeMs;
+this(new ProduceResponseData()
+.setResponses(responses.entrySet()
+.stream()
+.collect(Collectors.groupingBy(e -> e.getKey().topic()))
+.entrySet()
+.stream()
+.map(topicData -> new 
ProduceResponseData.TopicProduceResponse()
+.setTopic(topicData.getKey())
+.setPartitionResponses(topicData.getValue()
+.stream()
+.map(p -> new 
ProduceResponseData.PartitionProduceResponse()
+.setPartition(p.getKey().partition())
+.setBaseOffset(p.getValue().baseOffset)
+.setLogStartOffset(p.getValue().logStartOffset)
+.setLogAppendTime(p.getValue().logAppendTime)
+.setErrorMessage(p.getValue().errorMessage)
+.setErrorCode(p.getValue().error.code())
+.setRecordErrors(p.getValue().recordErrors
+.stream()
+.map(e -> new 
ProduceResponseData.BatchIndexAndErrorMessage()
+.setBatchIndex(e.batchIndex)
+.setBatchIndexErrorMessage(e.message))
+.collect(Collectors.toList(
+.collect(Collectors.toList(
+.collect(Collectors.toList()))
+.setThrottleTimeMs(throttleTimeMs));
 }
 
 /**
- * Constructor from a {@link Struct}.
+ * Visible for testing.
  */
-public ProduceResponse(Struct struct) {
-responses = new HashMap<>();
-for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) {
-Struct topicRespStruct = (Struct) topicResponse;
-String topic = topicRespStruct.get(TOPIC_NAME);
-
-for (Object partResponse : 
topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) {
-Struct partRespStruct = (Struct) partResponse;
-int partition = partRespStruct.get(PARTITION_ID);
-Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE));
-long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME);
-long logAppendTime = 
partRespStruct.hasField(LOG_APPEND_TIME_KEY_NAME) ?
-partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME) : 
RecordBatch.NO_TIMESTAMP;
-long logStartOffset = 
partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET);
-
-List recordErrors = Collections.emptyList();
-if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) {
-Object[] recordErrorsArray = 
partRespStruct.getArray(RECORD_ERRORS_KEY_NAME);
-if (recordErrorsArray.length > 0) {
-recordErrors = new 
ArrayList<>(recordErrorsArray.length);
-for (Object indexAndMessage : recordErrorsArray) {
-Struct indexAndMessageStruct = (Struct) 
indexAndMessage;
-recordErrors.add(new RecordError(
-
indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME),
-
indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD)
-));
-}
-}
-}
-
-String errorMessage = 
partResp

[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r514594596



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##
@@ -210,65 +142,42 @@ public String toString() {
 }
 }
 
+/**
+ * We have to copy acks, timeout, transactionalId and partitionSizes from 
data since data maybe reset to eliminate
+ * the reference to ByteBuffer but those metadata are still useful.
+ */
 private final short acks;
 private final int timeout;
 private final String transactionalId;
-
-private final Map partitionSizes;
-
+// visible for testing
+final Map partitionSizes;
+private boolean hasTransactionalRecords = false;
+private boolean hasIdempotentRecords = false;
 // This is set to null by `clearPartitionRecords` to prevent unnecessary 
memory retention when a produce request is
 // put in the purgatory (due to client throttling, it can take a while 
before the response is sent).
 // Care should be taken in methods that use this field.
-private volatile Map partitionRecords;
-private boolean hasTransactionalRecords = false;
-private boolean hasIdempotentRecords = false;
-
-private ProduceRequest(short version, short acks, int timeout, 
Map partitionRecords, String transactionalId) {
-super(ApiKeys.PRODUCE, version);
-this.acks = acks;
-this.timeout = timeout;
-
-this.transactionalId = transactionalId;
-this.partitionRecords = partitionRecords;
-this.partitionSizes = createPartitionSizes(partitionRecords);
+private volatile ProduceRequestData data;
 
-for (MemoryRecords records : partitionRecords.values()) {
-setFlags(records);
-}
-}
-
-private static Map 
createPartitionSizes(Map partitionRecords) {
-Map result = new 
HashMap<>(partitionRecords.size());
-for (Map.Entry entry : 
partitionRecords.entrySet())
-result.put(entry.getKey(), entry.getValue().sizeInBytes());
-return result;
-}
-
-public ProduceRequest(Struct struct, short version) {
+public ProduceRequest(ProduceRequestData produceRequestData, short 
version) {
 super(ApiKeys.PRODUCE, version);
-partitionRecords = new HashMap<>();
-for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
-Struct topicData = (Struct) topicDataObj;
-String topic = topicData.get(TOPIC_NAME);
-for (Object partitionResponseObj : 
topicData.getArray(PARTITION_DATA_KEY_NAME)) {
-Struct partitionResponse = (Struct) partitionResponseObj;
-int partition = partitionResponse.get(PARTITION_ID);
-MemoryRecords records = (MemoryRecords) 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-setFlags(records);
-partitionRecords.put(new TopicPartition(topic, partition), 
records);
-}
-}
-partitionSizes = createPartitionSizes(partitionRecords);
-acks = struct.getShort(ACKS_KEY_NAME);
-timeout = struct.getInt(TIMEOUT_KEY_NAME);
-transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
-}
-
-private void setFlags(MemoryRecords records) {
-Iterator iterator = records.batches().iterator();
-MutableRecordBatch entry = iterator.next();
-hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId();
-hasTransactionalRecords = hasTransactionalRecords || 
entry.isTransactional();
+this.data = produceRequestData;
+this.data.topicData().forEach(topicProduceData -> 
topicProduceData.partitions()
+.forEach(partitionProduceData -> {
+MemoryRecords records = 
MemoryRecords.readableRecords(partitionProduceData.records());
+Iterator iterator = 
records.batches().iterator();
+MutableRecordBatch entry = iterator.next();
+hasIdempotentRecords = hasIdempotentRecords || 
entry.hasProducerId();

Review comment:
   On the other hand, we might want to move this logic into a helper in 
`KafkaApis` so that these objects are dedicated only to serialization logic. 
Eventually we'll want to get rid of `ProduceRequest` and just use 
`ProduceRequestData`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r514467652



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##
@@ -210,65 +142,42 @@ public String toString() {
 }
 }
 
+/**
+ * We have to copy acks, timeout, transactionalId and partitionSizes from 
data since data maybe reset to eliminate
+ * the reference to ByteBuffer but those metadata are still useful.
+ */
 private final short acks;
 private final int timeout;
 private final String transactionalId;
-
-private final Map partitionSizes;
-
+// visible for testing
+final Map partitionSizes;
+private boolean hasTransactionalRecords = false;
+private boolean hasIdempotentRecords = false;
 // This is set to null by `clearPartitionRecords` to prevent unnecessary 
memory retention when a produce request is
 // put in the purgatory (due to client throttling, it can take a while 
before the response is sent).
 // Care should be taken in methods that use this field.
-private volatile Map partitionRecords;
-private boolean hasTransactionalRecords = false;
-private boolean hasIdempotentRecords = false;
-
-private ProduceRequest(short version, short acks, int timeout, 
Map partitionRecords, String transactionalId) {
-super(ApiKeys.PRODUCE, version);
-this.acks = acks;
-this.timeout = timeout;
-
-this.transactionalId = transactionalId;
-this.partitionRecords = partitionRecords;
-this.partitionSizes = createPartitionSizes(partitionRecords);
+private volatile ProduceRequestData data;
 
-for (MemoryRecords records : partitionRecords.values()) {
-setFlags(records);
-}
-}
-
-private static Map 
createPartitionSizes(Map partitionRecords) {
-Map result = new 
HashMap<>(partitionRecords.size());
-for (Map.Entry entry : 
partitionRecords.entrySet())
-result.put(entry.getKey(), entry.getValue().sizeInBytes());
-return result;
-}
-
-public ProduceRequest(Struct struct, short version) {
+public ProduceRequest(ProduceRequestData produceRequestData, short 
version) {
 super(ApiKeys.PRODUCE, version);
-partitionRecords = new HashMap<>();
-for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
-Struct topicData = (Struct) topicDataObj;
-String topic = topicData.get(TOPIC_NAME);
-for (Object partitionResponseObj : 
topicData.getArray(PARTITION_DATA_KEY_NAME)) {
-Struct partitionResponse = (Struct) partitionResponseObj;
-int partition = partitionResponse.get(PARTITION_ID);
-MemoryRecords records = (MemoryRecords) 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-setFlags(records);
-partitionRecords.put(new TopicPartition(topic, partition), 
records);
-}
-}
-partitionSizes = createPartitionSizes(partitionRecords);
-acks = struct.getShort(ACKS_KEY_NAME);
-timeout = struct.getInt(TIMEOUT_KEY_NAME);
-transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
-}
-
-private void setFlags(MemoryRecords records) {
-Iterator iterator = records.batches().iterator();
-MutableRecordBatch entry = iterator.next();
-hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId();
-hasTransactionalRecords = hasTransactionalRecords || 
entry.isTransactional();
+this.data = produceRequestData;
+this.data.topicData().forEach(topicProduceData -> 
topicProduceData.partitions()
+.forEach(partitionProduceData -> {
+MemoryRecords records = 
MemoryRecords.readableRecords(partitionProduceData.records());
+Iterator iterator = 
records.batches().iterator();
+MutableRecordBatch entry = iterator.next();
+hasIdempotentRecords = hasIdempotentRecords || 
entry.hasProducerId();

Review comment:
   Nevermind, I guess we have to do it here because the server needs to 
validate the request received from the client.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r514326022



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##
@@ -210,65 +142,42 @@ public String toString() {
 }
 }
 
+/**
+ * We have to copy acks, timeout, transactionalId and partitionSizes from 
data since data maybe reset to eliminate
+ * the reference to ByteBuffer but those metadata are still useful.
+ */
 private final short acks;
 private final int timeout;
 private final String transactionalId;
-
-private final Map partitionSizes;
-
+// visible for testing
+final Map partitionSizes;
+private boolean hasTransactionalRecords = false;
+private boolean hasIdempotentRecords = false;
 // This is set to null by `clearPartitionRecords` to prevent unnecessary 
memory retention when a produce request is
 // put in the purgatory (due to client throttling, it can take a while 
before the response is sent).
 // Care should be taken in methods that use this field.
-private volatile Map partitionRecords;
-private boolean hasTransactionalRecords = false;
-private boolean hasIdempotentRecords = false;
-
-private ProduceRequest(short version, short acks, int timeout, 
Map partitionRecords, String transactionalId) {
-super(ApiKeys.PRODUCE, version);
-this.acks = acks;
-this.timeout = timeout;
-
-this.transactionalId = transactionalId;
-this.partitionRecords = partitionRecords;
-this.partitionSizes = createPartitionSizes(partitionRecords);
+private volatile ProduceRequestData data;
 
-for (MemoryRecords records : partitionRecords.values()) {
-setFlags(records);
-}
-}
-
-private static Map 
createPartitionSizes(Map partitionRecords) {
-Map result = new 
HashMap<>(partitionRecords.size());
-for (Map.Entry entry : 
partitionRecords.entrySet())
-result.put(entry.getKey(), entry.getValue().sizeInBytes());
-return result;
-}
-
-public ProduceRequest(Struct struct, short version) {
+public ProduceRequest(ProduceRequestData produceRequestData, short 
version) {
 super(ApiKeys.PRODUCE, version);
-partitionRecords = new HashMap<>();
-for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
-Struct topicData = (Struct) topicDataObj;
-String topic = topicData.get(TOPIC_NAME);
-for (Object partitionResponseObj : 
topicData.getArray(PARTITION_DATA_KEY_NAME)) {
-Struct partitionResponse = (Struct) partitionResponseObj;
-int partition = partitionResponse.get(PARTITION_ID);
-MemoryRecords records = (MemoryRecords) 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-setFlags(records);
-partitionRecords.put(new TopicPartition(topic, partition), 
records);
-}
-}
-partitionSizes = createPartitionSizes(partitionRecords);
-acks = struct.getShort(ACKS_KEY_NAME);
-timeout = struct.getInt(TIMEOUT_KEY_NAME);
-transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
-}
-
-private void setFlags(MemoryRecords records) {
-Iterator iterator = records.batches().iterator();
-MutableRecordBatch entry = iterator.next();
-hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId();
-hasTransactionalRecords = hasTransactionalRecords || 
entry.isTransactional();
+this.data = produceRequestData;
+this.data.topicData().forEach(topicProduceData -> 
topicProduceData.partitions()
+.forEach(partitionProduceData -> {
+MemoryRecords records = 
MemoryRecords.readableRecords(partitionProduceData.records());
+Iterator iterator = 
records.batches().iterator();
+MutableRecordBatch entry = iterator.next();
+hasIdempotentRecords = hasIdempotentRecords || 
entry.hasProducerId();

Review comment:
   Would it make sense to move this to the builder where we are already 
doing a pass over the partitions?

##
File path: clients/src/main/resources/common/message/ProduceRequest.json
##
@@ -33,21 +33,21 @@
   "validVersions": "0-8",
   "flexibleVersions": "none",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "0+", "entityType": "transactionalId",
+{ "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "3+", "ignorable": true, "entityType": "transactionalId",
   "about": "The transactional ID, or null if the producer is not 
transactional." },
 { "name": "Acks", "type": "int16", "versions": "0+",
   "about": "The number of acknowledgments the producer requires the leader 
to have received before considering a re