junrao commented on code in PR #19964: URL: https://github.com/apache/kafka/pull/19964#discussion_r2146568617
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -609,7 +607,22 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, .collect(Collectors.toList()), p.errorMessage(), p.currentLeader()); - ProducerBatch batch = batches.get(tp); + ProducerBatch batch = null; + // Version 13 drop topic name and add support to topic id. + // We need to find batch based on topic id and partition index only as + // topic name in the response might be empty. + List<ProducerBatch> matchedBatchesForTopicId = batches.entrySet().stream() + .filter(entry -> entry.getKey().same(new TopicIdPartition(r.topicId(), p.index(), r.name()))) + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + + if (matchedBatchesForTopicId.size() > 1) { + matchedBatchesForTopicId.forEach(matchedBatch -> + failBatch(matchedBatch, new RuntimeException("More than one batch with same topic id and partition."), false)); Review Comment: If this is unexpected, we want to throw IllegalStateException as in other places in this file ########## clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java: ########## @@ -78,6 +78,18 @@ public TopicPartition topicPartition() { return topicPartition; } + /** + * @return true if topic has same topicId and partition index as topic names some time might be empty. + */ + public boolean same(TopicIdPartition tpId) { + if (tpId.topic().isEmpty()) { + return topicId.equals(tpId.topicId) && + topicPartition.partition() == tpId.partition(); + } else { + return this.equals(tpId); Review Comment: If topic name is not empty, it means that the topicId is zero. In this case, we just want to match based on topic name, right? ########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -609,7 +607,22 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, .collect(Collectors.toList()), p.errorMessage(), p.currentLeader()); - ProducerBatch batch = batches.get(tp); + ProducerBatch batch = null; + // Version 13 drop topic name and add support to topic id. + // We need to find batch based on topic id and partition index only as + // topic name in the response might be empty. + List<ProducerBatch> matchedBatchesForTopicId = batches.entrySet().stream() + .filter(entry -> entry.getKey().same(new TopicIdPartition(r.topicId(), p.index(), r.name()))) Review Comment: Could we just find the first match? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org