lianetm commented on code in PR #22460:
URL: https://github.com/apache/kafka/pull/22460#discussion_r3373624833


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -774,13 +776,44 @@ public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offs
 
         if (!offsets.isEmpty()) {
             long start = time.nanoseconds();
+            var topics = 
offsets.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+            var waitMs = awaitTopicMetadata(topics);
+            var remainingMs = Math.max(0L, maxBlockTimeMs - waitMs);
             TransactionalRequestResult result = 
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
             sender.wakeup();
-            result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, 
SEND_OFFSETS_TIMEOUT_MSG);
+            result.await(remainingMs, TimeUnit.MILLISECONDS, 
SEND_OFFSETS_TIMEOUT_MSG);
             producerMetrics.recordSendOffsets(time.nanoseconds() - start);
         }
     }
 
+    /**
+     * Ensure every topic is present in the producer's metadata cache.
+     * {@link ProducerMetadata#add(String, long)} refreshes the expiry for 
known
+     * topics and triggers a fetch for new ones; if any topic is new, blocks 
the
+     * user thread on {@link ProducerMetadata#awaitUpdate(int, long)} up to
+     * {@code max.block.ms}, mirroring {@link #partitionsFor(String)}. Returns
+     * the elapsed wait time in milliseconds so the caller can subtract it from
+     * its own {@code max.block.ms} budget.
+     */
+    private long awaitTopicMetadata(Set<String> topics) {

Review Comment:
   this does a single attempt to resolve the topics, instead of retrying until 
all topics are resolved (which is what the producer does for `send` and 
`partitionsFor`)
   
https://github.com/apache/kafka/blob/47fbf15f7be59a462cdb0fa5e120c4d5048bc5de/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1267
   
   Is this intentional? (best effort) 



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -774,13 +776,44 @@ public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offs
 
         if (!offsets.isEmpty()) {
             long start = time.nanoseconds();
+            var topics = 
offsets.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+            var waitMs = awaitTopicMetadata(topics);
+            var remainingMs = Math.max(0L, maxBlockTimeMs - waitMs);
             TransactionalRequestResult result = 
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
             sender.wakeup();
-            result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, 
SEND_OFFSETS_TIMEOUT_MSG);
+            result.await(remainingMs, TimeUnit.MILLISECONDS, 
SEND_OFFSETS_TIMEOUT_MSG);
             producerMetrics.recordSendOffsets(time.nanoseconds() - start);
         }
     }
 
+    /**
+     * Ensure every topic is present in the producer's metadata cache.
+     * {@link ProducerMetadata#add(String, long)} refreshes the expiry for 
known
+     * topics and triggers a fetch for new ones; if any topic is new, blocks 
the
+     * user thread on {@link ProducerMetadata#awaitUpdate(int, long)} up to
+     * {@code max.block.ms}, mirroring {@link #partitionsFor(String)}. Returns
+     * the elapsed wait time in milliseconds so the caller can subtract it from
+     * its own {@code max.block.ms} budget.
+     */
+    private long awaitTopicMetadata(Set<String> topics) {
+        long nowMs = time.milliseconds();
+        boolean hasNewTopics = false;
+        for (String topic : topics) {
+            hasNewTopics |= metadata.add(topic, nowMs);
+        }
+        if (!hasNewTopics) return 0L;
+        int version = metadata.requestUpdate(true);
+        // Wake the sender so it picks up the metadata request immediately
+        // instead of waiting for its next selector poll to elapse.
+        sender.wakeup();
+        try {
+            metadata.awaitUpdate(version, maxBlockTimeMs);

Review Comment:
   this introduces a wait on metadata that didn't exist before on the 
`sendOffsetsToTransaction` API, so some thoughts around errors:
   1. Timeout: if we timeout waiting on metadata, it will go up to the 
`sendOffsetsToTransaction` call, should we update the java doc? (the docs 
include TimeoutException but only related "the time taken for sending the 
request", different path/issue/config. The allowed max.block is shared now 
between metadata resolution and send request)
   2. this can throw errors (the fatal ones), e.g., the new BootstrapException, 
which are not currently in the contract. I think those would make sense to 
propagate and we just need to update KIP-909 that is still in dev. 
   3. this await does not propagate the auth and invalid topic errors , but 
they could be identified here and stored in the metadata obj (we usually have 
to manually call `maybeThrowExceptionForTopic`). With the current shape here we 
just swallow them, is that the intention? The auth ones would come anyways from 
the sendOffsets request I expect. In the case of invalid topic, we would 
swallow here the InvalidTopicException that we get from metadata, but then end 
up with an Unknown_topic when handling the sendOffsets request.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to