rhauch commented on a change in pull request #11524: URL: https://github.com/apache/kafka/pull/11524#discussion_r755233693
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java ########## @@ -68,6 +75,9 @@ SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset SubmittedRecord result = new SubmittedRecord(partition, offset); records.computeIfAbsent(result.partition(), p -> new LinkedList<>()) .add(result); + synchronized (this) { + numUnackedMessages.incrementAndGet(); + } Review comment: Does this need to be synchronized? See below ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java ########## @@ -132,6 +144,27 @@ public CommittableOffsets committableOffsets() { return new CommittableOffsets(offsets, totalCommittableMessages, totalUncommittableMessages, records.size(), largestDequeSize, largestDequePartition); } + /** + * Wait for all currently in-flight messages to be acknowledged, up to the requested timeout. Review comment: In a few other places where we have a constraint like this, I think we tend to say when this method can be called. So for example, something like: ```suggestion * Wait for all currently in-flight messages to be acknowledged, up to the requested timeout. * This method is expected to be called from the same thread that calls {@link #committableOffsets()}. ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java ########## @@ -42,9 +46,12 @@ // Visible for testing final Map<Map<String, Object>, Deque<SubmittedRecord>> records; + private AtomicInteger numUnackedMessages; Review comment: Can this be made final, and then maybe initialize the field here to be perfectly clear that it's initialized and never changed? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java ########## @@ -132,6 +144,27 @@ public CommittableOffsets committableOffsets() { return new CommittableOffsets(offsets, totalCommittableMessages, totalUncommittableMessages, records.size(), largestDequeSize, largestDequePartition); } + /** + * Wait for all currently in-flight messages to be acknowledged, up to the requested timeout. + * @param timeout the maximum time to wait + * @param timeUnit the time unit of the timeout argument + * @return whether all in-flight messages were acknowledged before the timeout elapsed + */ + public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) { + // Create a new message drain latch as a local variable to avoid SpotBugs warnings about inconsistent synchronization + // on an instance variable when invoking CountDownLatch::await outside a synchronized block + CountDownLatch messageDrainLatch; + synchronized (this) { + messageDrainLatch = new CountDownLatch(numUnackedMessages.get()); + this.messageDrainLatch = messageDrainLatch; + } Review comment: Could this code be simplified and the synchronized block removed altogether by changing the `messageDrainLatch` field to: ``` private final AtomicReference<CountDownLatch> messageDrainLatch = new AtomicReference<>(); ``` and then changing these lines above to: ```suggestion // Create a new message drain latch as a local variable to avoid SpotBugs warnings about inconsistent synchronization // on an instance variable when invoking CountDownLatch::await outside a synchronized block CountDownLatch messageDrainLatch = this.messageDrainLatch.updateAndGet(existing -> new CountDownLatch(numUnackedMessages.get())); ``` This synchronized block ensures that the latch is initialized sized with the number of unacked messages _at the time this method is called_ but does not prevent new messages from being added. Using concurrent types for the two fields solves the first issue, while the second is prevented by having the `WorkerSourceTask#execute()` method call `submit(...)`, which increments `numUnackedMessages`, and then only call `awaitAllMessages()` after the task has been told to stop (at which point `submit(...)` will not be called again and `numAckedMessages` will not be incremented). And calling out that last assumption would be good. Might be as simple as adding the following to the`submitMessages(...)` JavaDoc: ``` * * <p>This method should never be called after {@link #awaitAllMessages(long, TimeUnit)} has been called. ``` You'd also have to change the `messageAcked()` method to use the atomic reference: ``` private void messageAcked() { numUnackedMessages.decrementAndGet(); CountDownLatch messageDrainLatch = this.messageDrainLatch.get(); if (messageDrainLatch != null) { messageDrainLatch.countDown(); } } ``` and remove the `synchronized` keyword. Again, I don't think we need to atomically update _both_ the number of acked messages _and_ counts down the latch atomically; we really just need them each to be updated consistently. There are a few reasons why this might be better: 1. We avoid `synchronized` keyword, which might be unclear to our future selves ("Is the documentation about not being thread-safe wrong?"). 2. We can make the fields be `final`, which IMO makes the logic a bit easier to follow. 3. We don't need stricter synchronization than individual atomic updates of each field. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -260,6 +259,10 @@ public void execute() { } catch (InterruptedException e) { // Ignore and allow to exit. } finally { + submittedRecords.awaitAllMessages( + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG), Review comment: Yeah, I think I agree. The previous behavior called `commitOffsets()` on shutdown, and that method blocked for up to `offset.flush.timeout.ms` anyway. So IMO it makes sense to block up to that amount of time before calling `updateCommittableOffsets()` and `commitOffsets()` on the subsequent lines. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java ########## @@ -132,6 +144,27 @@ public CommittableOffsets committableOffsets() { return new CommittableOffsets(offsets, totalCommittableMessages, totalUncommittableMessages, records.size(), largestDequeSize, largestDequePartition); } + /** + * Wait for all currently in-flight messages to be acknowledged, up to the requested timeout. + * @param timeout the maximum time to wait + * @param timeUnit the time unit of the timeout argument + * @return whether all in-flight messages were acknowledged before the timeout elapsed + */ + public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) { Review comment: That's fine, but the JavaDoc should be updated to mention this method, too, since it must be called from a different thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org