rhauch commented on a change in pull request #11524:
URL: https://github.com/apache/kafka/pull/11524#discussion_r755233693



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -68,6 +75,9 @@ SubmittedRecord submit(Map<String, Object> partition, 
Map<String, Object> offset
         SubmittedRecord result = new SubmittedRecord(partition, offset);
         records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
                 .add(result);
+        synchronized (this) {
+            numUnackedMessages.incrementAndGet();
+        }

Review comment:
       Does this need to be synchronized? See below

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -132,6 +144,27 @@ public CommittableOffsets committableOffsets() {
         return new CommittableOffsets(offsets, totalCommittableMessages, 
totalUncommittableMessages, records.size(), largestDequeSize, 
largestDequePartition);
     }
 
+    /**
+     * Wait for all currently in-flight messages to be acknowledged, up to the 
requested timeout.

Review comment:
       In a few other places where we have a constraint like this, I think we 
tend to say when this method can be called. So for example, something like:
   ```suggestion
        * Wait for all currently in-flight messages to be acknowledged, up to 
the requested timeout.
        * This method is expected to be called from the same thread that calls 
{@link #committableOffsets()}.
   ```
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -42,9 +46,12 @@
 
     // Visible for testing
     final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+    private AtomicInteger numUnackedMessages;

Review comment:
       Can this be made final, and then maybe initialize the field here to be 
perfectly clear that it's initialized and never changed?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -132,6 +144,27 @@ public CommittableOffsets committableOffsets() {
         return new CommittableOffsets(offsets, totalCommittableMessages, 
totalUncommittableMessages, records.size(), largestDequeSize, 
largestDequePartition);
     }
 
+    /**
+     * Wait for all currently in-flight messages to be acknowledged, up to the 
requested timeout.
+     * @param timeout the maximum time to wait
+     * @param timeUnit the time unit of the timeout argument
+     * @return whether all in-flight messages were acknowledged before the 
timeout elapsed
+     */
+    public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) {
+        // Create a new message drain latch as a local variable to avoid 
SpotBugs warnings about inconsistent synchronization
+        // on an instance variable when invoking CountDownLatch::await outside 
a synchronized block
+        CountDownLatch messageDrainLatch;
+        synchronized (this) {
+            messageDrainLatch = new CountDownLatch(numUnackedMessages.get());
+            this.messageDrainLatch = messageDrainLatch;
+        }

Review comment:
       Could this code be simplified and the synchronized block removed 
altogether by changing the `messageDrainLatch` field to:
   ```
       private final AtomicReference<CountDownLatch> messageDrainLatch = new 
AtomicReference<>();
   ```
   and then changing these lines above to:
   ```suggestion
           // Create a new message drain latch as a local variable to avoid 
SpotBugs warnings about inconsistent synchronization
           // on an instance variable when invoking CountDownLatch::await 
outside a synchronized block
           CountDownLatch messageDrainLatch = 
this.messageDrainLatch.updateAndGet(existing -> new 
CountDownLatch(numUnackedMessages.get()));
   ```
   This synchronized block ensures that the latch is initialized sized with the 
number of unacked messages _at the time this method is called_ but does not 
prevent new messages from being added. Using concurrent types for the two 
fields solves the first issue, while the second is prevented by having the 
`WorkerSourceTask#execute()` method call `submit(...)`, which increments 
`numUnackedMessages`, and then only call `awaitAllMessages()` after the task 
has been told to stop (at which point `submit(...)` will not be called again 
and `numAckedMessages` will not be incremented).
   
   And calling out that last assumption would be good. Might be as simple as 
adding the following to the`submitMessages(...)` JavaDoc:
   ```
        *
        * <p>This method should never be called after {@link 
#awaitAllMessages(long, TimeUnit)} has been called.
   ```
   
   You'd also have to change the `messageAcked()` method to use the atomic 
reference:
   ```
       private void messageAcked() {
           numUnackedMessages.decrementAndGet();
           CountDownLatch messageDrainLatch = this.messageDrainLatch.get();
           if (messageDrainLatch != null) {
               messageDrainLatch.countDown();
           }
       }
   ```
   and remove the `synchronized` keyword. Again, I don't think we need to 
atomically update _both_ the number of acked messages _and_ counts down the 
latch atomically; we really just need them each to be updated consistently.
   
   There are a few reasons why this might be better:
   1. We avoid `synchronized` keyword, which might be unclear to our future 
selves ("Is the documentation about not being thread-safe wrong?").
   2. We can make the fields be `final`, which IMO makes the logic a bit easier 
to follow.
   3. We don't need stricter synchronization than individual atomic updates of 
each field.
   
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -260,6 +259,10 @@ public void execute() {
         } catch (InterruptedException e) {
             // Ignore and allow to exit.
         } finally {
+            submittedRecords.awaitAllMessages(
+                    
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG),

Review comment:
       Yeah, I think I agree. The previous behavior called `commitOffsets()` on 
shutdown, and that method blocked for up to `offset.flush.timeout.ms` anyway. 
So IMO it makes sense to block up to that amount of time before calling 
`updateCommittableOffsets()` and `commitOffsets()` on the subsequent lines.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -132,6 +144,27 @@ public CommittableOffsets committableOffsets() {
         return new CommittableOffsets(offsets, totalCommittableMessages, 
totalUncommittableMessages, records.size(), largestDequeSize, 
largestDequePartition);
     }
 
+    /**
+     * Wait for all currently in-flight messages to be acknowledged, up to the 
requested timeout.
+     * @param timeout the maximum time to wait
+     * @param timeUnit the time unit of the timeout argument
+     * @return whether all in-flight messages were acknowledged before the 
timeout elapsed
+     */
+    public boolean awaitAllMessages(long timeout, TimeUnit timeUnit) {

Review comment:
       That's fine, but the JavaDoc should be updated to mention this method, 
too, since it must be called from a different thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to