rhauch commented on a change in pull request #11524:
URL: https://github.com/apache/kafka/pull/11524#discussion_r755438613



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -132,6 +141,28 @@ public CommittableOffsets committableOffsets() {
         return new CommittableOffsets(offsets, totalCommittableMessages, 
totalUncommittableMessages, records.size(), largestDequeSize, 
largestDequePartition);
     }
 
+    /**
+     * Wait for all currently in-flight messages to be acknowledged, up to the 
requested timeout.
+     * This method is expected to be called from the same thread that calls 
{@link #committableOffsets()}.
+     * @param timeout the maximum time to wait
+     * @param timeUnit the time unit of the timeout argument
+     * @return whether all in-flight messages were acknowledged before the 
timeout elapsed
+     */
+    public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) {
+        // Create a new message drain latch as a local variable to avoid 
SpotBugs warnings about inconsistent synchronization
+        // on an instance variable when invoking CountDownLatch::await outside 
a synchronized block
+        CountDownLatch messageDrainLatch;
+        synchronized (this) {
+            messageDrainLatch = new CountDownLatch(numUnackedMessages);
+            this.messageDrainLatch = messageDrainLatch;
+        }
+        try {
+            return messageDrainLatch.await(timeout, timeUnit);
+        } catch (InterruptedException e) {
+            return false;

Review comment:
       Should this clear the interrupted flag before returning, since we're 
handling the interruption here?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -146,27 +177,39 @@ private boolean canCommitHead(Deque<SubmittedRecord> 
queuedRecords) {
         return queuedRecords.peek() != null && queuedRecords.peek().acked();
     }
 
-    static class SubmittedRecord {
+    // Synchronize in order to ensure that the number of unacknowledged 
messages isn't modified in the middle of a call
+    // to awaitAllMessages (which might cause us to decrement first, then 
create a new message drain latch, then count down
+    // that latch here, effectively double-acking the message)
+    private synchronized void messageAcked() {
+        numUnackedMessages--;
+        if (messageDrainLatch != null) {
+            messageDrainLatch.countDown();
+        }
+    }
+
+    class SubmittedRecord {
         private final Map<String, Object> partition;
         private final Map<String, Object> offset;
-        private volatile boolean acked;
+        private final AtomicBoolean acked;
 
         public SubmittedRecord(Map<String, Object> partition, Map<String, 
Object> offset) {
             this.partition = partition;
             this.offset = offset;
-            this.acked = false;
+            this.acked = new AtomicBoolean(false);
         }
 
         /**
          * Acknowledge this record; signals that its offset may be safely 
committed.
          * This is safe to be called from a different thread than what called 
{@link SubmittedRecords#submit(SourceRecord)}.
          */
         public void ack() {
-            this.acked = true;
+            if (this.acked.compareAndSet(false, true)) {
+                messageAcked();
+            }

Review comment:
       Nice simplification.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to