hachikuji commented on a change in pull request #10112: URL: https://github.com/apache/kafka/pull/10112#discussion_r582378783
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -475,11 +476,15 @@ public boolean commitOffsets() { synchronized (this) { // First we need to make sure we snapshot everything in exactly the current state. This // means both the current set of messages we're still waiting to finish, stored in this - // class, which setting flushing = true will handle by storing any new values into a new + // class, which setting recordFlushPending = true will handle by storing any new values into a new // buffer; and the current set of user-specified offsets, stored in the // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. - flushing = true; - boolean flushStarted = offsetWriter.beginFlush(); + // No need to begin a new offset flush if we timed out waiting for records to be flushed to + // Kafka in a prior attempt. + if (!recordFlushPending) { Review comment: > (Please correct me if I'm wrong on this point; my core knowledge is a little fuzzy and maybe there are stronger guarantees than I'm aware of) Out-of-order acknowledgment of records makes tracking the latest offset for a given source partition a little less trivial than it seems initially. For example, if a task produces two records with the same source partition that end up being delivered to different topic-partitions, the second record may be ack'd before the first, and when it comes time for offset commit, the framework would have to refrain from committing offsets for that second record until the first is also ack'd. Ok, that rings a bell. I think I see how the logic works now and I don't see an obvious way to make it simpler. Doing something finer-grained as you said might be the way to go. Anyway, I agree this is something to save for a follow-up improvement. > I think it's a necessary evil, since source task offset commits are conducted on a single thread. Without a timeout for offset commits, a single task could block indefinitely and disable offset commits for all other tasks on the cluster. Hmm.. This is suspicious. Why do we need to block the executor while we wait for the flush? Would it be simpler to let the worker source task finish the flush and the offset commit in its own event thread? We end up blocking the event thread anyway because of the need to do it under the lock. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org