C0urante commented on a change in pull request #10112: URL: https://github.com/apache/kafka/pull/10112#discussion_r583154155
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -475,11 +476,15 @@ public boolean commitOffsets() { synchronized (this) { // First we need to make sure we snapshot everything in exactly the current state. This // means both the current set of messages we're still waiting to finish, stored in this - // class, which setting flushing = true will handle by storing any new values into a new + // class, which setting recordFlushPending = true will handle by storing any new values into a new // buffer; and the current set of user-specified offsets, stored in the // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. - flushing = true; - boolean flushStarted = offsetWriter.beginFlush(); + // No need to begin a new offset flush if we timed out waiting for records to be flushed to + // Kafka in a prior attempt. + if (!recordFlushPending) { Review comment: > We end up blocking the event thread anyway because of the need to do it under the lock. I think we actually keep polling the task for records during the offset commit, which is the entire reason we have the `outstandingMessagesBacklog` field. Without it, we'd just add everything to `outstandingMessages` knowing that, if we've made it to the point of adding a record to that collection, we're not in the process of committing offset, right? Concretely, we can see that the offset thread [relinquishes the lock on the `WorkerSourceTask` instance while waiting for outstanding messages to be ack'd](https://github.com/C0urante/kafka/blob/03c5a83a8277fa7c4ec503c3e044ae61cff06eea/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java). I'm not sure we _need_ to perform offset commits on a separate thread, but it is in line with what we do for sink tasks, where we [leverage the `Consumer::commitAsync` method](https://github.com/apache/kafka/blob/e2a0d0c90e1916d77223a420e3595e8aba643001/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L365). If we want to consider making offset commit synchronous (which is likely going to happen anyways when transactional writes for exactly-once source are introduced), that also might be worth a follow-up. The biggest problem I can think of with that approach would be that a single offline topic-partition would block up the entire task thread when it comes time for offset commit. If we keep the timeout for offset commit, then that'd limit the fallout and allow us to resume polling new records from the task and dispatching them to the producer after the commit attempt timed out. However, there'd still be a non-negligible throughput hit (especially for workers configured with higher offset timeouts). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org