hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r514467652



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##########
@@ -210,65 +142,42 @@ public String toString() {
         }
     }
 
+    /**
+     * We have to copy acks, timeout, transactionalId and partitionSizes from 
data since data maybe reset to eliminate
+     * the reference to ByteBuffer but those metadata are still useful.
+     */
     private final short acks;
     private final int timeout;
     private final String transactionalId;
-
-    private final Map<TopicPartition, Integer> partitionSizes;
-
+    // visible for testing
+    final Map<TopicPartition, Integer> partitionSizes;
+    private boolean hasTransactionalRecords = false;
+    private boolean hasIdempotentRecords = false;
     // This is set to null by `clearPartitionRecords` to prevent unnecessary 
memory retention when a produce request is
     // put in the purgatory (due to client throttling, it can take a while 
before the response is sent).
     // Care should be taken in methods that use this field.
-    private volatile Map<TopicPartition, MemoryRecords> partitionRecords;
-    private boolean hasTransactionalRecords = false;
-    private boolean hasIdempotentRecords = false;
-
-    private ProduceRequest(short version, short acks, int timeout, 
Map<TopicPartition, MemoryRecords> partitionRecords, String transactionalId) {
-        super(ApiKeys.PRODUCE, version);
-        this.acks = acks;
-        this.timeout = timeout;
-
-        this.transactionalId = transactionalId;
-        this.partitionRecords = partitionRecords;
-        this.partitionSizes = createPartitionSizes(partitionRecords);
+    private volatile ProduceRequestData data;
 
-        for (MemoryRecords records : partitionRecords.values()) {
-            setFlags(records);
-        }
-    }
-
-    private static Map<TopicPartition, Integer> 
createPartitionSizes(Map<TopicPartition, MemoryRecords> partitionRecords) {
-        Map<TopicPartition, Integer> result = new 
HashMap<>(partitionRecords.size());
-        for (Map.Entry<TopicPartition, MemoryRecords> entry : 
partitionRecords.entrySet())
-            result.put(entry.getKey(), entry.getValue().sizeInBytes());
-        return result;
-    }
-
-    public ProduceRequest(Struct struct, short version) {
+    public ProduceRequest(ProduceRequestData produceRequestData, short 
version) {
         super(ApiKeys.PRODUCE, version);
-        partitionRecords = new HashMap<>();
-        for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
-            Struct topicData = (Struct) topicDataObj;
-            String topic = topicData.get(TOPIC_NAME);
-            for (Object partitionResponseObj : 
topicData.getArray(PARTITION_DATA_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.get(PARTITION_ID);
-                MemoryRecords records = (MemoryRecords) 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                setFlags(records);
-                partitionRecords.put(new TopicPartition(topic, partition), 
records);
-            }
-        }
-        partitionSizes = createPartitionSizes(partitionRecords);
-        acks = struct.getShort(ACKS_KEY_NAME);
-        timeout = struct.getInt(TIMEOUT_KEY_NAME);
-        transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
-    }
-
-    private void setFlags(MemoryRecords records) {
-        Iterator<MutableRecordBatch> iterator = records.batches().iterator();
-        MutableRecordBatch entry = iterator.next();
-        hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId();
-        hasTransactionalRecords = hasTransactionalRecords || 
entry.isTransactional();
+        this.data = produceRequestData;
+        this.data.topicData().forEach(topicProduceData -> 
topicProduceData.partitions()
+            .forEach(partitionProduceData -> {
+                MemoryRecords records = 
MemoryRecords.readableRecords(partitionProduceData.records());
+                Iterator<MutableRecordBatch> iterator = 
records.batches().iterator();
+                MutableRecordBatch entry = iterator.next();
+                hasIdempotentRecords = hasIdempotentRecords || 
entry.hasProducerId();

Review comment:
       Nevermind, I guess we have to do it here because the server needs to 
validate the request received from the client.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to