chia7712 commented on a change in pull request #9401: URL: https://github.com/apache/kafka/pull/9401#discussion_r526029843
########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ########## @@ -560,13 +562,24 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId); // if we have a response, parse it if (response.hasResponse()) { + // Sender should exercise PartitionProduceResponse rather than ProduceResponse.PartitionResponse + // https://issues.apache.org/jira/browse/KAFKA-10696 ProduceResponse produceResponse = (ProduceResponse) response.responseBody(); - for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) { - TopicPartition tp = entry.getKey(); - ProduceResponse.PartitionResponse partResp = entry.getValue(); + produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { + TopicPartition tp = new TopicPartition(r.name(), p.index()); + ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse( + Errors.forCode(p.errorCode()), + p.baseOffset(), + p.logAppendTimeMs(), + p.logStartOffset(), + p.recordErrors() + .stream() + .map(e -> new ProduceResponse.RecordError(e.batchIndex(), e.batchIndexErrorMessage())) + .collect(Collectors.toList()), Review comment: The reason we got rid of streaming APIs is because it produces extra collection (groupBy). However, in this case we have to create a new collection to carry non-auto-generated data (and https://issues.apache.org/jira/browse/KAFKA-10696 will eliminate such conversion) even if we get rid of stream APIs. Hence, it should be fine to keep stream APIs here. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ########## @@ -203,119 +77,88 @@ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) { * @param responses Produced data grouped by topic-partition * @param throttleTimeMs Time in milliseconds the response was throttled */ + @Deprecated public ProduceResponse(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) { - this.responses = responses; - this.throttleTimeMs = throttleTimeMs; + this(toData(responses, throttleTimeMs)); } - /** - * Constructor from a {@link Struct}. - */ - public ProduceResponse(Struct struct) { - responses = new HashMap<>(); - for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) { - Struct topicRespStruct = (Struct) topicResponse; - String topic = topicRespStruct.get(TOPIC_NAME); - - for (Object partResponse : topicRespStruct.getArray(PARTITION_RESPONSES_KEY_NAME)) { - Struct partRespStruct = (Struct) partResponse; - int partition = partRespStruct.get(PARTITION_ID); - Errors error = Errors.forCode(partRespStruct.get(ERROR_CODE)); - long offset = partRespStruct.getLong(BASE_OFFSET_KEY_NAME); - long logAppendTime = partRespStruct.hasField(LOG_APPEND_TIME_KEY_NAME) ? - partRespStruct.getLong(LOG_APPEND_TIME_KEY_NAME) : RecordBatch.NO_TIMESTAMP; - long logStartOffset = partRespStruct.getOrElse(LOG_START_OFFSET_FIELD, INVALID_OFFSET); - - List<RecordError> recordErrors = Collections.emptyList(); - if (partRespStruct.hasField(RECORD_ERRORS_KEY_NAME)) { - Object[] recordErrorsArray = partRespStruct.getArray(RECORD_ERRORS_KEY_NAME); - if (recordErrorsArray.length > 0) { - recordErrors = new ArrayList<>(recordErrorsArray.length); - for (Object indexAndMessage : recordErrorsArray) { - Struct indexAndMessageStruct = (Struct) indexAndMessage; - recordErrors.add(new RecordError( - indexAndMessageStruct.getInt(BATCH_INDEX_KEY_NAME), - indexAndMessageStruct.get(BATCH_INDEX_ERROR_MESSAGE_FIELD) - )); - } - } - } + @Override + protected Send toSend(String destination, ResponseHeader header, short apiVersion) { + return SendBuilder.buildResponseSend(destination, header, this.data, apiVersion); + } - String errorMessage = partRespStruct.getOrElse(ERROR_MESSAGE_FIELD, null); - TopicPartition tp = new TopicPartition(topic, partition); - responses.put(tp, new PartitionResponse(error, offset, logAppendTime, logStartOffset, recordErrors, errorMessage)); + private static ProduceResponseData toData(Map<TopicPartition, PartitionResponse> responses, int throttleTimeMs) { + ProduceResponseData data = new ProduceResponseData().setThrottleTimeMs(throttleTimeMs); + responses.forEach((tp, response) -> { + ProduceResponseData.TopicProduceResponse tpr = data.responses().find(tp.topic()); + if (tpr == null) { + tpr = new ProduceResponseData.TopicProduceResponse().setName(tp.topic()); + data.responses().add(tpr); } - } - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); + tpr.partitionResponses() + .add(new ProduceResponseData.PartitionProduceResponse() + .setIndex(tp.partition()) + .setBaseOffset(response.baseOffset) + .setLogStartOffset(response.logStartOffset) + .setLogAppendTimeMs(response.logAppendTime) + .setErrorMessage(response.errorMessage) + .setErrorCode(response.error.code()) + .setRecordErrors(response.recordErrors + .stream() + .map(e -> new ProduceResponseData.BatchIndexAndErrorMessage() + .setBatchIndex(e.batchIndex) + .setBatchIndexErrorMessage(e.message)) + .collect(Collectors.toList()))); Review comment: ditto ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org