C0urante commented on a change in pull request #10112: URL: https://github.com/apache/kafka/pull/10112#discussion_r582351074
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -475,11 +476,15 @@ public boolean commitOffsets() { synchronized (this) { // First we need to make sure we snapshot everything in exactly the current state. This // means both the current set of messages we're still waiting to finish, stored in this - // class, which setting flushing = true will handle by storing any new values into a new + // class, which setting recordFlushPending = true will handle by storing any new values into a new // buffer; and the current set of user-specified offsets, stored in the // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. - flushing = true; - boolean flushStarted = offsetWriter.beginFlush(); + // No need to begin a new offset flush if we timed out waiting for records to be flushed to + // Kafka in a prior attempt. + if (!recordFlushPending) { Review comment: 1. I think it's a necessary evil, since source task offset commits are [conducted on a single thread](https://github.com/apache/kafka/blob/3f09fb97b6943c0612488dfa8e5eab8078fd7ca0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L64). Without a timeout for offset commits, a single task could block indefinitely and disable offset commits for all other tasks on the cluster. 2. This is definitely possible; I think the only saving grace here is that the combined sizes of the `outstandingMessages` and `outstandingMessagesBacklog` fields is going to be naturally throttled by the producer's buffer. If too many records are accumulated, the call to `Producer::send` will block synchronously until space is freed up, at which point, the worker can continue polling the task for new records. This isn't ideal as it will essentially cause the producer's entire buffer to be occupied until the throughput of record production from the task decreases and/or the write throughput of the producer rises to meet it, but it at least establishes an upper bound for how large a single batch of records in the `oustandingMessages` field ever gets. It may take several offset commit attempts for all of the records in that batch to be ack'd, with all but the last (successful) attempt timing out and failing, but forward progress with offset commits should still be possible. I share your feelings about the complexity here. I think ultimately it arises from two constraints: 1. A worker-global producer is used to write source offsets to the internal offsets topic right now. Although this doesn't necessarily require the single-threaded logic for offset commits mentioned above, things become simpler with it. 2. (Please correct me if I'm wrong on this point; my core knowledge is a little fuzzy and maybe there are stronger guarantees than I'm aware of) Out-of-order acknowledgment of records makes tracking the latest offset for a given source partition a little less trivial than it seems initially. For example, if a task produces two records with the same source partition that end up being delivered to different topic-partitions, the second record may be ack'd before the first, and when it comes time for offset commit, the framework would have to refrain from committing offsets for that second record until the first is also ack'd. I don't think either of these points make it impossible to add even more-fine-grained offset commit behavior and/or remove offset commit timeouts, but the work involved would be a fair amount heavier than this relatively-minor patch. If you'd prefer to see something along those lines, could we consider merging this patch for the moment and perform a more serious overhaul of the source task offset commit logic as a follow-up, possibly with a small design discussion on a Jira ticket to make sure there's alignment on the new behavior? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org