hachikuji commented on a change in pull request #10112: URL: https://github.com/apache/kafka/pull/10112#discussion_r583219431
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -475,11 +476,15 @@ public boolean commitOffsets() { synchronized (this) { // First we need to make sure we snapshot everything in exactly the current state. This // means both the current set of messages we're still waiting to finish, stored in this - // class, which setting flushing = true will handle by storing any new values into a new + // class, which setting recordFlushPending = true will handle by storing any new values into a new // buffer; and the current set of user-specified offsets, stored in the // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. - flushing = true; - boolean flushStarted = offsetWriter.beginFlush(); + // No need to begin a new offset flush if we timed out waiting for records to be flushed to + // Kafka in a prior attempt. + if (!recordFlushPending) { Review comment: It's mostly the flushing that concerns me, not really the offset commit. I don't think we need to make it synchronous, just that it seems silly to block that shared scheduler to complete it. My thought instead was to let the scheduler trigger the flush, but then let the task be responsible for waiting for its completion. While waiting, of course, it can continue writing to `outstandingMessagesBacklog`. So I don't think there should be any issue from a throughput perspective. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org