C0urante commented on a change in pull request #11524: URL: https://github.com/apache/kafka/pull/11524#discussion_r755508174
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java ########## @@ -132,6 +141,28 @@ public CommittableOffsets committableOffsets() { return new CommittableOffsets(offsets, totalCommittableMessages, totalUncommittableMessages, records.size(), largestDequeSize, largestDequePartition); } + /** + * Wait for all currently in-flight messages to be acknowledged, up to the requested timeout. + * This method is expected to be called from the same thread that calls {@link #committableOffsets()}. + * @param timeout the maximum time to wait + * @param timeUnit the time unit of the timeout argument + * @return whether all in-flight messages were acknowledged before the timeout elapsed + */ + public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) { + // Create a new message drain latch as a local variable to avoid SpotBugs warnings about inconsistent synchronization + // on an instance variable when invoking CountDownLatch::await outside a synchronized block + CountDownLatch messageDrainLatch; + synchronized (this) { + messageDrainLatch = new CountDownLatch(numUnackedMessages); + this.messageDrainLatch = messageDrainLatch; + } + try { + return messageDrainLatch.await(timeout, timeUnit); + } catch (InterruptedException e) { + return false; Review comment: I believe this is already accomplished. According to the [Javadocs](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html#await-long-java.util.concurrent.TimeUnit-) for `CountDownLatch::await`: > If the current thread: • has its interrupted status set on entry to this method; or • is interrupted while waiting, then `InterruptedException` is thrown and the current thread's interrupted status is cleared. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org