dajac commented on code in PR #22460:
URL: https://github.com/apache/kafka/pull/22460#discussion_r3380300119
##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -774,13 +778,44 @@ public void sendOffsetsToTransaction(Map<TopicPartition,
OffsetAndMetadata> offs
if (!offsets.isEmpty()) {
long start = time.nanoseconds();
+ var topics =
offsets.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+ var waitMs = awaitTopicMetadata(topics);
+ var remainingMs = Math.max(0L, maxBlockTimeMs - waitMs);
TransactionalRequestResult result =
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
- result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
+ result.await(remainingMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
}
}
+ /**
+ * Ensure every topic is present in the producer's metadata cache.
+ * {@link ProducerMetadata#add(String, long)} refreshes the expiry for
known
+ * topics and triggers a fetch for new ones; if any topic is new, blocks
the
+ * user thread on {@link ProducerMetadata#awaitUpdate(int, long)} up to
+ * {@code max.block.ms}, mirroring {@link #partitionsFor(String)}. Returns
+ * the elapsed wait time in milliseconds so the caller can subtract it from
+ * its own {@code max.block.ms} budget.
+ */
+ private long awaitTopicMetadata(Set<String> topics) {
+ long nowMs = time.milliseconds();
+ boolean hasNewTopics = false;
+ for (String topic : topics) {
+ hasNewTopics |= metadata.add(topic, nowMs);
+ }
+ if (!hasNewTopics) return 0L;
+ int version = metadata.requestUpdate(true);
+ // Wake the sender so it picks up the metadata request immediately
+ // instead of waiting for its next selector poll to elapse.
+ sender.wakeup();
+ try {
+ metadata.awaitUpdate(version, maxBlockTimeMs);
+ } catch (InterruptedException e) {
+ throw new InterruptException(e);
+ }
+ return time.milliseconds() - nowMs;
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]