dajac commented on code in PR #22460:
URL: https://github.com/apache/kafka/pull/22460#discussion_r3380298607
##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -774,13 +778,44 @@ public void sendOffsetsToTransaction(Map<TopicPartition,
OffsetAndMetadata> offs
if (!offsets.isEmpty()) {
long start = time.nanoseconds();
+ var topics =
offsets.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+ var waitMs = awaitTopicMetadata(topics);
+ var remainingMs = Math.max(0L, maxBlockTimeMs - waitMs);
TransactionalRequestResult result =
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
- result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
+ result.await(remainingMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
}
}
+ /**
+ * Ensure every topic is present in the producer's metadata cache.
Review Comment:
makes sense. updated the javadoc.
##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -774,13 +778,44 @@ public void sendOffsetsToTransaction(Map<TopicPartition,
OffsetAndMetadata> offs
if (!offsets.isEmpty()) {
long start = time.nanoseconds();
+ var topics =
offsets.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+ var waitMs = awaitTopicMetadata(topics);
+ var remainingMs = Math.max(0L, maxBlockTimeMs - waitMs);
TransactionalRequestResult result =
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
- result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
+ result.await(remainingMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
}
}
+ /**
+ * Ensure every topic is present in the producer's metadata cache.
+ * {@link ProducerMetadata#add(String, long)} refreshes the expiry for
known
+ * topics and triggers a fetch for new ones; if any topic is new, blocks
the
+ * user thread on {@link ProducerMetadata#awaitUpdate(int, long)} up to
+ * {@code max.block.ms}, mirroring {@link #partitionsFor(String)}. Returns
+ * the elapsed wait time in milliseconds so the caller can subtract it from
+ * its own {@code max.block.ms} budget.
+ */
+ private long awaitTopicMetadata(Set<String> topics) {
+ long nowMs = time.milliseconds();
+ boolean hasNewTopics = false;
+ for (String topic : topics) {
+ hasNewTopics |= metadata.add(topic, nowMs);
+ }
+ if (!hasNewTopics) return 0L;
+ int version = metadata.requestUpdate(true);
Review Comment:
good catch. updated it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]