dajac commented on code in PR #22460:
URL: https://github.com/apache/kafka/pull/22460#discussion_r3375651010
##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -774,13 +776,44 @@ public void sendOffsetsToTransaction(Map<TopicPartition,
OffsetAndMetadata> offs
if (!offsets.isEmpty()) {
long start = time.nanoseconds();
+ var topics =
offsets.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+ var waitMs = awaitTopicMetadata(topics);
+ var remainingMs = Math.max(0L, maxBlockTimeMs - waitMs);
TransactionalRequestResult result =
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
- result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
+ result.await(remainingMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
}
}
+ /**
+ * Ensure every topic is present in the producer's metadata cache.
+ * {@link ProducerMetadata#add(String, long)} refreshes the expiry for
known
+ * topics and triggers a fetch for new ones; if any topic is new, blocks
the
+ * user thread on {@link ProducerMetadata#awaitUpdate(int, long)} up to
+ * {@code max.block.ms}, mirroring {@link #partitionsFor(String)}. Returns
+ * the elapsed wait time in milliseconds so the caller can subtract it from
+ * its own {@code max.block.ms} budget.
+ */
+ private long awaitTopicMetadata(Set<String> topics) {
Review Comment:
Yes. This is best effort resolution. If we can get the topic ids, we use
them, otherwise, it fails back to using topic names.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]