hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r514594596



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##########
@@ -210,65 +142,42 @@ public String toString() {
         }
     }
 
+    /**
+     * We have to copy acks, timeout, transactionalId and partitionSizes from 
data since data maybe reset to eliminate
+     * the reference to ByteBuffer but those metadata are still useful.
+     */
     private final short acks;
     private final int timeout;
     private final String transactionalId;
-
-    private final Map<TopicPartition, Integer> partitionSizes;
-
+    // visible for testing
+    final Map<TopicPartition, Integer> partitionSizes;
+    private boolean hasTransactionalRecords = false;
+    private boolean hasIdempotentRecords = false;
     // This is set to null by `clearPartitionRecords` to prevent unnecessary 
memory retention when a produce request is
     // put in the purgatory (due to client throttling, it can take a while 
before the response is sent).
     // Care should be taken in methods that use this field.
-    private volatile Map<TopicPartition, MemoryRecords> partitionRecords;
-    private boolean hasTransactionalRecords = false;
-    private boolean hasIdempotentRecords = false;
-
-    private ProduceRequest(short version, short acks, int timeout, 
Map<TopicPartition, MemoryRecords> partitionRecords, String transactionalId) {
-        super(ApiKeys.PRODUCE, version);
-        this.acks = acks;
-        this.timeout = timeout;
-
-        this.transactionalId = transactionalId;
-        this.partitionRecords = partitionRecords;
-        this.partitionSizes = createPartitionSizes(partitionRecords);
+    private volatile ProduceRequestData data;
 
-        for (MemoryRecords records : partitionRecords.values()) {
-            setFlags(records);
-        }
-    }
-
-    private static Map<TopicPartition, Integer> 
createPartitionSizes(Map<TopicPartition, MemoryRecords> partitionRecords) {
-        Map<TopicPartition, Integer> result = new 
HashMap<>(partitionRecords.size());
-        for (Map.Entry<TopicPartition, MemoryRecords> entry : 
partitionRecords.entrySet())
-            result.put(entry.getKey(), entry.getValue().sizeInBytes());
-        return result;
-    }
-
-    public ProduceRequest(Struct struct, short version) {
+    public ProduceRequest(ProduceRequestData produceRequestData, short 
version) {
         super(ApiKeys.PRODUCE, version);
-        partitionRecords = new HashMap<>();
-        for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
-            Struct topicData = (Struct) topicDataObj;
-            String topic = topicData.get(TOPIC_NAME);
-            for (Object partitionResponseObj : 
topicData.getArray(PARTITION_DATA_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.get(PARTITION_ID);
-                MemoryRecords records = (MemoryRecords) 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                setFlags(records);
-                partitionRecords.put(new TopicPartition(topic, partition), 
records);
-            }
-        }
-        partitionSizes = createPartitionSizes(partitionRecords);
-        acks = struct.getShort(ACKS_KEY_NAME);
-        timeout = struct.getInt(TIMEOUT_KEY_NAME);
-        transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
-    }
-
-    private void setFlags(MemoryRecords records) {
-        Iterator<MutableRecordBatch> iterator = records.batches().iterator();
-        MutableRecordBatch entry = iterator.next();
-        hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId();
-        hasTransactionalRecords = hasTransactionalRecords || 
entry.isTransactional();
+        this.data = produceRequestData;
+        this.data.topicData().forEach(topicProduceData -> 
topicProduceData.partitions()
+            .forEach(partitionProduceData -> {
+                MemoryRecords records = 
MemoryRecords.readableRecords(partitionProduceData.records());
+                Iterator<MutableRecordBatch> iterator = 
records.batches().iterator();
+                MutableRecordBatch entry = iterator.next();
+                hasIdempotentRecords = hasIdempotentRecords || 
entry.hasProducerId();

Review comment:
       On the other hand, we might want to move this logic into a helper in 
`KafkaApis` so that these objects are dedicated only to serialization logic. 
Eventually we'll want to get rid of `ProduceRequest` and just use 
`ProduceRequestData`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to