OmniaGM commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1617533592


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, 
Map<TopicPartition,
                 // This will be set by completeBatch.
                 Map<TopicPartition, Metadata.LeaderIdAndEpoch> 
partitionsWithUpdatedLeaderInfo = new HashMap<>();
                 produceResponse.data().responses().forEach(r -> 
r.partitionResponses().forEach(p -> {
-                    TopicPartition tp = new TopicPartition(r.name(), 
p.index());
+                    // Version 12 drop topic name and add support to topic id. 
However, metadata can be used to map topic id to topic name.
+                    String topicName = (r.name() == null || 
r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name();

Review Comment:
   - If topic has been recreated and topic id is out of date, the client will 
get `UNKNOWN_TOPIC_ID` and on the retry the topic id will be updated 
   - If topic has been reassigned to another broker then the client will get 
`NOT_LEADER_OR_FOLLOWER` and then the client can retry with the right broker. 
   - Am not sure what `upgrade case/downgrade` you refer too here Do you mean 
the client and broker IBP combination? If yes then these are covered in 
`ProduceRequestTest` and `RequestResponseTest`
   
   I added two test cases to cover the first two and the producer seem to self 
recover on retry. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to