chia7712 commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r526029843



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -560,13 +562,24 @@ private void handleProduceResponse(ClientResponse 
response, Map<TopicPartition,
             log.trace("Received produce response from node {} with correlation 
id {}", response.destination(), correlationId);
             // if we have a response, parse it
             if (response.hasResponse()) {
+                // Sender should exercise PartitionProduceResponse rather than 
ProduceResponse.PartitionResponse
+                // https://issues.apache.org/jira/browse/KAFKA-10696
                 ProduceResponse produceResponse = (ProduceResponse) 
response.responseBody();
-                for (Map.Entry<TopicPartition, 
ProduceResponse.PartitionResponse> entry : 
produceResponse.responses().entrySet()) {
-                    TopicPartition tp = entry.getKey();
-                    ProduceResponse.PartitionResponse partResp = 
entry.getValue();
+                produceResponse.data().responses().forEach(r -> 
r.partitionResponses().forEach(p -> {
+                    TopicPartition tp = new TopicPartition(r.name(), 
p.index());
+                    ProduceResponse.PartitionResponse partResp = new 
ProduceResponse.PartitionResponse(
+                            Errors.forCode(p.errorCode()),
+                            p.baseOffset(),
+                            p.logAppendTimeMs(),
+                            p.logStartOffset(),
+                            p.recordErrors()
+                                .stream()
+                                .map(e -> new 
ProduceResponse.RecordError(e.batchIndex(), e.batchIndexErrorMessage()))
+                                .collect(Collectors.toList()),

Review comment:
       The reason we got rid of streaming APIs is because it produces extra 
collection (groupBy). However, in this case we have to create a new collection 
to carry non-auto-generated data (and 
https://issues.apache.org/jira/browse/KAFKA-10696 will eliminate such 
conversion) even if we get rid of stream APIs. Hence, it should be fine to keep 
stream APIs here.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
##########
@@ -203,119 +77,88 @@ public ProduceResponse(Map<TopicPartition, 
PartitionResponse> responses) {
      * @param responses Produced data grouped by topic-partition
      * @param throttleTimeMs Time in milliseconds the response was throttled
      */
+    @Deprecated
     public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, 
int throttleTimeMs) {
-        this.responses = responses;
-        this.throttleTimeMs = throttleTimeMs;
+        this(toData(responses, throttleTimeMs));
     }
 
-    /**
-     * Constructor from a {@link Struct}.
-     */
-    public ProduceResponse(Struct struct) {
-        responses = new HashMap<>();
-        for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicRespStruct = (Struct) topicResponse;
-            String topic = topicRespStruct.get(TOPIC_NAME);
-
-            for (Object partResponse : 
topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) {
-                Struct partRespStruct = (Struct) partResponse;
-                int partition = partRespStruct.get(PARTITION_ID);
-                Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE));
-                long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME);
-                long logAppendTime = 
partRespStruct.hasField(LOG_APPEND_TIME_KEY_NAME) ?
-                        partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME) : 
RecordBatch.NO_TIMESTAMP;
-                long logStartOffset = 
partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET);
-
-                List<RecordError> recordErrors = Collections.emptyList();
-                if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) {
-                    Object[] recordErrorsArray = 
partRespStruct.getArray(RECORD_ERRORS_KEY_NAME);
-                    if (recordErrorsArray.length > 0) {
-                        recordErrors = new 
ArrayList<>(recordErrorsArray.length);
-                        for (Object indexAndMessage : recordErrorsArray) {
-                            Struct indexAndMessageStruct = (Struct) 
indexAndMessage;
-                            recordErrors.add(new RecordError(
-                                    
indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME),
-                                    
indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD)
-                            ));
-                        }
-                    }
-                }
+    @Override
+    protected Send toSend(String destination, ResponseHeader header, short 
apiVersion) {
+        return SendBuilder.buildResponseSend(destination, header, this.data, 
apiVersion);
+    }
 
-                String errorMessage = 
partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null);
-                TopicPartition tp = new TopicPartition(topic, partition);
-                responses.put(tp, new PartitionResponse(error, offset, 
logAppendTime, logStartOffset, recordErrors, errorMessage));
+    private static ProduceResponseData toData(Map<TopicPartition, 
PartitionResponse> responses, int throttleTimeMs) {
+        ProduceResponseData data = new 
ProduceResponseData().setThrottleTimeMs(throttleTimeMs);
+        responses.forEach((tp, response) -> {
+            ProduceResponseData.TopicProduceResponse tpr = 
data.responses().find(tp.topic());
+            if (tpr == null) {
+                tpr = new 
ProduceResponseData.TopicProduceResponse().setName(tp.topic());
+                data.responses().add(tpr);
             }
-        }
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, 
DEFAULT_THROTTLE_TIME);
+            tpr.partitionResponses()
+                .add(new ProduceResponseData.PartitionProduceResponse()
+                    .setIndex(tp.partition())
+                    .setBaseOffset(response.baseOffset)
+                    .setLogStartOffset(response.logStartOffset)
+                    .setLogAppendTimeMs(response.logAppendTime)
+                    .setErrorMessage(response.errorMessage)
+                    .setErrorCode(response.error.code())
+                    .setRecordErrors(response.recordErrors
+                        .stream()
+                        .map(e -> new 
ProduceResponseData.BatchIndexAndErrorMessage()
+                            .setBatchIndex(e.batchIndex)
+                            .setBatchIndexErrorMessage(e.message))
+                        .collect(Collectors.toList())));

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to