C0urante commented on a change in pull request #10112: URL: https://github.com/apache/kafka/pull/10112#discussion_r589603947
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -475,11 +476,15 @@ public boolean commitOffsets() { synchronized (this) { // First we need to make sure we snapshot everything in exactly the current state. This // means both the current set of messages we're still waiting to finish, stored in this - // class, which setting flushing = true will handle by storing any new values into a new + // class, which setting recordFlushPending = true will handle by storing any new values into a new // buffer; and the current set of user-specified offsets, stored in the // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. - flushing = true; - boolean flushStarted = offsetWriter.beginFlush(); + // No need to begin a new offset flush if we timed out waiting for records to be flushed to + // Kafka in a prior attempt. + if (!recordFlushPending) { Review comment: I've been ruminating over this for a few days and I think it should be possible to make task offset commits independent of each other by changing the source task offset commit scheduler to use a multi-threaded executor instead of a global single-threaded executor for all tasks. This isn't quite the same thing as what you're proposing since tasks would still not be responsible for waiting for flush completion (the offset scheduler's threads would be), but it's a smaller change and as far as I can tell, the potential downsides only really amount to a few extra threads being created. The usage of [`scheduleWithFixedDelay`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-) already ensures that two offset commits for the same task won't be active at the same time, as it "Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next." Beyond that, the only concern that comes to mind is potential races caused by concurrent access of the offset backing store and its underlying resources. In distributed mode, the `KafkaOffsetBackingStore` and its usage of the underlying `KafkaBasedLog` appear to be thread-safe as everything basically boils down to calls to `Producer::send`, which should be fine. In standalone mode, the `MemoryOffsetBackingStore` handles all writes/reads of the local offsets file via a single-threaded executor, so concurrent calls to `MemoryOffsetBackingStore::set` should also be fine. Granted, none of this addresses your original concern, which is whether an offset commit timeout is necessary at all. In response to that, I think we may also want to revisit the offset commit logic and possibly do away with a timeout altogether. In sink tasks, for example, offset commit timeouts are almost a cosmetic feature at this point and are really only useful for metrics tracking. However, at the moment it's actually been pretty useful to us to monitor source task offset commit success/failure JMX metrics as a means of tracking overall task health. We might be able to make up the difference by relying on metrics for the number of active records, but it's probably not safe to make that assumption for all users, especially for what is intended to be a bug fix. So, if possible, I'd like to leave a lot of the offset commit logic intact as it is for the moment and try to keep the changes here minimal. To summarize: I'd like to proceed by keeping the currently-proposed changes, and changing the source task offset committer to use a multi-threaded executor instead of a single-threaded executor. I can file a follow-up ticket to track improvements in offset commit logic (definitely for source tasks, and possibly for sinks) and we can look into that if it becomes a problem in the future. What do you think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org