C0urante commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r583154155



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly 
the current state. This
             // means both the current set of messages we're still waiting to 
finish, stored in this
-            // class, which setting flushing = true will handle by storing any 
new values into a new
+            // class, which setting recordFlushPending = true will handle by 
storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored 
in the
             // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for 
records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       > We end up blocking the event thread anyway because of the need to do 
it under the lock.
   
   I think we actually keep polling the task for records during the offset 
commit, which is the entire reason we have the `outstandingMessagesBacklog` 
field. Without it, we'd just add everything to `outstandingMessages` knowing 
that, if we've made it to the point of adding a record to that collection, 
we're not in the process of committing offset, right?
   
   Concretely, we can see that the offset thread [relinquishes the lock on the 
`WorkerSourceTask` instance while waiting for outstanding messages to be 
ack'd](https://github.com/C0urante/kafka/blob/03c5a83a8277fa7c4ec503c3e044ae61cff06eea/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java).
   
   
   I'm not sure we _need_ to perform offset commits on a separate thread, but 
it is in line with what we do for sink tasks, where we [leverage the 
`Consumer::commitAsync` 
method](https://github.com/apache/kafka/blob/e2a0d0c90e1916d77223a420e3595e8aba643001/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L365).
   
   If we want to consider making offset commit synchronous (which is likely 
going to happen anyways when transactional writes for exactly-once source are 
introduced), that also might be worth a follow-up. The biggest problem I can 
think of with that approach would be that a single offline topic-partition 
would block up the entire task thread when it comes time for offset commit. If 
we keep the timeout for offset commit, then that'd limit the fallout and allow 
us to resume polling new records from the task and dispatching them to the 
producer after the commit attempt timed out. However, there'd still be a 
non-negligible throughput hit (especially for workers configured with higher 
offset timeouts).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to