hachikuji commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r582378783



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly 
the current state. This
             // means both the current set of messages we're still waiting to 
finish, stored in this
-            // class, which setting flushing = true will handle by storing any 
new values into a new
+            // class, which setting recordFlushPending = true will handle by 
storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored 
in the
             // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for 
records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       > (Please correct me if I'm wrong on this point; my core knowledge is a 
little fuzzy and maybe there are stronger guarantees than I'm aware of) 
Out-of-order acknowledgment of records makes tracking the latest offset for a 
given source partition a little less trivial than it seems initially. For 
example, if a task produces two records with the same source partition that end 
up being delivered to different topic-partitions, the second record may be 
ack'd before the first, and when it comes time for offset commit, the framework 
would have to refrain from committing offsets for that second record until the 
first is also ack'd.
   
   Ok, that rings a bell. I think I see how the logic works now and I don't see 
an obvious way to make it simpler. Doing something finer-grained as you said 
might be the way to go. Anyway, I agree this is something to save for a 
follow-up improvement.
   
   > I think it's a necessary evil, since source task offset commits are 
conducted on a single thread. Without a timeout for offset commits, a single 
task could block indefinitely and disable offset commits for all other tasks on 
the cluster.
   
   Hmm.. This is suspicious. Why do we need to block the executor while we wait 
for the flush? Would it be simpler to let the worker source task finish the 
flush and the offset commit in its own event thread? We end up blocking the 
event thread anyway because of the need to do it under the lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to