junrao commented on PR #15968: URL: https://github.com/apache/kafka/pull/15968#issuecomment-2960096710
@lucasbru : Thanks for reporting this issue. Does the test delete and recreate the same topic? @OmniaGM : If a topic is recreated with a different topic id while processing a produce response, the following code may not handle it well. `topicName` will be empty if the topic id has changed in the metadata and `batch` will be null. ``` // Version 13 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name. String topicName = metadata.topicNames().getOrDefault(r.topicId(), r.name()); TopicPartition tp = new TopicPartition(topicName, p.index()); ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse( Errors.forCode(p.errorCode()), p.baseOffset(), p.logAppendTimeMs(), p.logStartOffset(), p.recordErrors() .stream() .map(e -> new ProduceResponse.RecordError(e.batchIndex(), e.batchIndexErrorMessage())) .collect(Collectors.toList()), p.errorMessage(), p.currentLeader()); ProducerBatch batch = batches.get(tp); ``` I am wondering if we should include `topicId` in `recordsByPartition` in sendProduceRequest(). This way, we don't need to check `metadata` for `topicId` since it could change over time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org