dajac commented on code in PR #22460:
URL: https://github.com/apache/kafka/pull/22460#discussion_r3375646636
##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -774,13 +776,44 @@ public void sendOffsetsToTransaction(Map<TopicPartition,
OffsetAndMetadata> offs
if (!offsets.isEmpty()) {
long start = time.nanoseconds();
+ var topics =
offsets.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+ var waitMs = awaitTopicMetadata(topics);
+ var remainingMs = Math.max(0L, maxBlockTimeMs - waitMs);
TransactionalRequestResult result =
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
- result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
+ result.await(remainingMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
}
}
+ /**
+ * Ensure every topic is present in the producer's metadata cache.
+ * {@link ProducerMetadata#add(String, long)} refreshes the expiry for
known
+ * topics and triggers a fetch for new ones; if any topic is new, blocks
the
+ * user thread on {@link ProducerMetadata#awaitUpdate(int, long)} up to
+ * {@code max.block.ms}, mirroring {@link #partitionsFor(String)}. Returns
+ * the elapsed wait time in milliseconds so the caller can subtract it from
+ * its own {@code max.block.ms} budget.
+ */
+ private long awaitTopicMetadata(Set<String> topics) {
+ long nowMs = time.milliseconds();
+ boolean hasNewTopics = false;
+ for (String topic : topics) {
+ hasNewTopics |= metadata.add(topic, nowMs);
+ }
+ if (!hasNewTopics) return 0L;
+ int version = metadata.requestUpdate(true);
+ // Wake the sender so it picks up the metadata request immediately
+ // instead of waiting for its next selector poll to elapse.
+ sender.wakeup();
+ try {
+ metadata.awaitUpdate(version, maxBlockTimeMs);
Review Comment:
1. Makes sense. I will update the javadoc.
2. We will address this in https://github.com/apache/kafka/pull/21080.
3. Discussed with Lianet offline and we will keep it as it is now so we
don't introduce new errors (e.g. InvalidTopicException). Errors will be caught
by the sendOffsets anyway.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]