OmniaGM commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1617533592
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, // This will be set by completeBatch. Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>(); produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { - TopicPartition tp = new TopicPartition(r.name(), p.index()); + // Version 12 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name. + String topicName = (r.name() == null || r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name(); Review Comment: - If topic has been recreated and topic id is out of date, the client will get `UNKNOWN_TOPIC_ID` and on the retry the topic id will be updated - If topic has been reassigned to another broker then the client will get `NOT_LEADER_OR_FOLLOWER` and then the client can retry with the right broker. - Am not sure what `upgrade case/downgrade` you refer too here Do you mean the client and broker IBP combination? If yes then some of these are covered in `ProduceRequestTest` and `RequestResponseTest` I added two [test cases](https://github.com/apache/kafka/pull/15968/commits/35dba4bd550dd269a03f5b1ce1d649346a481512) to cover the first two and the producer seem to self recover on retry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org