lianetm commented on code in PR #22460:
URL: https://github.com/apache/kafka/pull/22460#discussion_r3376053235


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -774,13 +778,44 @@ public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offs
 
         if (!offsets.isEmpty()) {
             long start = time.nanoseconds();
+            var topics = 
offsets.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+            var waitMs = awaitTopicMetadata(topics);
+            var remainingMs = Math.max(0L, maxBlockTimeMs - waitMs);
             TransactionalRequestResult result = 
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
             sender.wakeup();
-            result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, 
SEND_OFFSETS_TIMEOUT_MSG);
+            result.await(remainingMs, TimeUnit.MILLISECONDS, 
SEND_OFFSETS_TIMEOUT_MSG);
             producerMetrics.recordSendOffsets(time.nanoseconds() - start);
         }
     }
 
+    /**
+     * Ensure every topic is present in the producer's metadata cache.
+     * {@link ProducerMetadata#add(String, long)} refreshes the expiry for 
known
+     * topics and triggers a fetch for new ones; if any topic is new, blocks 
the
+     * user thread on {@link ProducerMetadata#awaitUpdate(int, long)} up to
+     * {@code max.block.ms}, mirroring {@link #partitionsFor(String)}. Returns
+     * the elapsed wait time in milliseconds so the caller can subtract it from
+     * its own {@code max.block.ms} budget.
+     */
+    private long awaitTopicMetadata(Set<String> topics) {
+        long nowMs = time.milliseconds();
+        boolean hasNewTopics = false;
+        for (String topic : topics) {
+            hasNewTopics |= metadata.add(topic, nowMs);
+        }
+        if (!hasNewTopics) return 0L;
+        int version = metadata.requestUpdate(true);

Review Comment:
   this requests a full update, is it needed? (or should we just request a 
partial one for the new topics, `requestUpdateForNewTopics`)



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -774,13 +778,44 @@ public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offs
 
         if (!offsets.isEmpty()) {
             long start = time.nanoseconds();
+            var topics = 
offsets.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+            var waitMs = awaitTopicMetadata(topics);
+            var remainingMs = Math.max(0L, maxBlockTimeMs - waitMs);
             TransactionalRequestResult result = 
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
             sender.wakeup();
-            result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, 
SEND_OFFSETS_TIMEOUT_MSG);
+            result.await(remainingMs, TimeUnit.MILLISECONDS, 
SEND_OFFSETS_TIMEOUT_MSG);
             producerMetrics.recordSendOffsets(time.nanoseconds() - start);
         }
     }
 
+    /**
+     * Ensure every topic is present in the producer's metadata cache.
+     * {@link ProducerMetadata#add(String, long)} refreshes the expiry for 
known
+     * topics and triggers a fetch for new ones; if any topic is new, blocks 
the
+     * user thread on {@link ProducerMetadata#awaitUpdate(int, long)} up to
+     * {@code max.block.ms}, mirroring {@link #partitionsFor(String)}. Returns
+     * the elapsed wait time in milliseconds so the caller can subtract it from
+     * its own {@code max.block.ms} budget.
+     */
+    private long awaitTopicMetadata(Set<String> topics) {
+        long nowMs = time.milliseconds();
+        boolean hasNewTopics = false;
+        for (String topic : topics) {
+            hasNewTopics |= metadata.add(topic, nowMs);
+        }
+        if (!hasNewTopics) return 0L;
+        int version = metadata.requestUpdate(true);
+        // Wake the sender so it picks up the metadata request immediately
+        // instead of waiting for its next selector poll to elapse.
+        sender.wakeup();
+        try {
+            metadata.awaitUpdate(version, maxBlockTimeMs);
+        } catch (InterruptedException e) {
+            throw new InterruptException(e);
+        }
+        return time.milliseconds() - nowMs;

Review Comment:
   there is a metric on the equivalent `waitOnMetadata` path, for the time the 
producer waits on metadata, shoudl we record it too?  
https://github.com/apache/kafka/blob/5111ad253a7ac12061c73ce4ab62a19ca7dc3d4e/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1269
   



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -774,13 +778,44 @@ public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offs
 
         if (!offsets.isEmpty()) {
             long start = time.nanoseconds();
+            var topics = 
offsets.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+            var waitMs = awaitTopicMetadata(topics);
+            var remainingMs = Math.max(0L, maxBlockTimeMs - waitMs);
             TransactionalRequestResult result = 
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
             sender.wakeup();
-            result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS, 
SEND_OFFSETS_TIMEOUT_MSG);
+            result.await(remainingMs, TimeUnit.MILLISECONDS, 
SEND_OFFSETS_TIMEOUT_MSG);
             producerMetrics.recordSendOffsets(time.nanoseconds() - start);
         }
     }
 
+    /**
+     * Ensure every topic is present in the producer's metadata cache.

Review Comment:
   well with the best-effort approach, saying this is probably a stretch? (We 
ensure we request metadata for the topics + await an update). Also the 
reference to "mirroring partitionsFor" may be a bit misleading? (there we do 
ensure every topic is present).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to