lianetm commented on code in PR #22460:
URL: https://github.com/apache/kafka/pull/22460#discussion_r3373624833
##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -774,13 +776,44 @@ public void sendOffsetsToTransaction(Map<TopicPartition,
OffsetAndMetadata> offs
if (!offsets.isEmpty()) {
long start = time.nanoseconds();
+ var topics =
offsets.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+ var waitMs = awaitTopicMetadata(topics);
+ var remainingMs = Math.max(0L, maxBlockTimeMs - waitMs);
TransactionalRequestResult result =
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
- result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
+ result.await(remainingMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
}
}
+ /**
+ * Ensure every topic is present in the producer's metadata cache.
+ * {@link ProducerMetadata#add(String, long)} refreshes the expiry for
known
+ * topics and triggers a fetch for new ones; if any topic is new, blocks
the
+ * user thread on {@link ProducerMetadata#awaitUpdate(int, long)} up to
+ * {@code max.block.ms}, mirroring {@link #partitionsFor(String)}. Returns
+ * the elapsed wait time in milliseconds so the caller can subtract it from
+ * its own {@code max.block.ms} budget.
+ */
+ private long awaitTopicMetadata(Set<String> topics) {
Review Comment:
this does a single attempt to resolve the topics, instead of retrying until
all topics are resolved (which is what the producer does for `send` and
`partitionsFor`)
https://github.com/apache/kafka/blob/47fbf15f7be59a462cdb0fa5e120c4d5048bc5de/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1267
Is this intentional? (best effort)
##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -774,13 +776,44 @@ public void sendOffsetsToTransaction(Map<TopicPartition,
OffsetAndMetadata> offs
if (!offsets.isEmpty()) {
long start = time.nanoseconds();
+ var topics =
offsets.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+ var waitMs = awaitTopicMetadata(topics);
+ var remainingMs = Math.max(0L, maxBlockTimeMs - waitMs);
TransactionalRequestResult result =
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
sender.wakeup();
- result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
+ result.await(remainingMs, TimeUnit.MILLISECONDS,
SEND_OFFSETS_TIMEOUT_MSG);
producerMetrics.recordSendOffsets(time.nanoseconds() - start);
}
}
+ /**
+ * Ensure every topic is present in the producer's metadata cache.
+ * {@link ProducerMetadata#add(String, long)} refreshes the expiry for
known
+ * topics and triggers a fetch for new ones; if any topic is new, blocks
the
+ * user thread on {@link ProducerMetadata#awaitUpdate(int, long)} up to
+ * {@code max.block.ms}, mirroring {@link #partitionsFor(String)}. Returns
+ * the elapsed wait time in milliseconds so the caller can subtract it from
+ * its own {@code max.block.ms} budget.
+ */
+ private long awaitTopicMetadata(Set<String> topics) {
+ long nowMs = time.milliseconds();
+ boolean hasNewTopics = false;
+ for (String topic : topics) {
+ hasNewTopics |= metadata.add(topic, nowMs);
+ }
+ if (!hasNewTopics) return 0L;
+ int version = metadata.requestUpdate(true);
+ // Wake the sender so it picks up the metadata request immediately
+ // instead of waiting for its next selector poll to elapse.
+ sender.wakeup();
+ try {
+ metadata.awaitUpdate(version, maxBlockTimeMs);
Review Comment:
this introduces a wait on metadata that didn't exist before on the
`sendOffsetsToTransaction` API, so some thoughts around errors:
1. Timeout: if we timeout waiting on metadata, it will go up to the
`sendOffsetsToTransaction` call, should we update the java doc? (the docs
include TimeoutException but only related "the time taken for sending the
request", different path/issue/config. The allowed max.block is shared now
between metadata resolution and send request)
2. this can throw errors (the fatal ones), e.g., the new BootstrapException,
which are not currently in the contract. I think those would make sense to
propagate and we just need to update KIP-909 that is still in dev.
3. this await does not propagate the auth and invalid topic errors , but
they could be identified here and stored in the metadata obj (we usually have
to manually call `maybeThrowExceptionForTopic`). With the current shape here we
just swallow them, is that the intention? The auth ones would come anyways from
the sendOffsets request I expect. In the case of invalid topic, we would
swallow here the InvalidTopicException that we get from metadata, but then end
up with an Unknown_topic when handling the sendOffsets request.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]