hachikuji commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r583219431



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly 
the current state. This
             // means both the current set of messages we're still waiting to 
finish, stored in this
-            // class, which setting flushing = true will handle by storing any 
new values into a new
+            // class, which setting recordFlushPending = true will handle by 
storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored 
in the
             // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for 
records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       It's mostly the flushing that concerns me, not really the offset commit. 
I don't think we need to make it synchronous, just that it seems silly to block 
that shared scheduler to complete it. My thought instead was to let the 
scheduler trigger the flush, but then let the task be responsible for waiting 
for its completion. While waiting, of course, it can continue writing to 
`outstandingMessagesBacklog`. So I don't think there should be any issue from a 
throughput perspective.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to