C0urante commented on a change in pull request #11524:
URL: https://github.com/apache/kafka/pull/11524#discussion_r754528381



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -260,6 +259,10 @@ public void execute() {
         } catch (InterruptedException e) {
             // Ignore and allow to exit.
         } finally {
+            submittedRecords.awaitAllMessages(
+                    
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG),

Review comment:
       Waiting for up to `offset.flush.timeout.ms` milliseconds here may cause 
shutdown to block for double that time (since the subsequent call to 
`WorkerSourceTask::commitOffsets` may also block for `offset.flush.timeout.ms` 
milliseconds while flushing offset information to the persistent backing store).
   
   I initially considered an approach where we would require the entire 
combination of (awaiting in-flight messages, computing committable offsets, 
committing offsets to backing store) to complete in `offset.flush.timeout.ms` 
milliseconds, but this came with the unfortunate drawback that, if any 
in-flight record were undeliverable within the flush timeout, it would prevent 
any offsets from being committed, which seems counter to the original 
motivation of the changes we made for 
[KAFKA-12226](https://issues.apache.org/jira/browse/KAFKA-12226) in 
https://github.com/apache/kafka/pull/11323.
   
   We could also consider blocking for at most half of 
`offset.flush.timeout.ms` here and then blocking for the remaining time in 
`commitOffsets` if we want to honor the docstring for `offset.flush.timeout.ms` 
as strictly as possible:
   > Maximum number of milliseconds to wait for records to flush and partition 
offset data to be committed to offset storage before cancelling the process and 
restoring the offset data to be committed in a future attempt.
   
   But I don't know if that's really necessary since in this case, these 
offsets have no chance of being committed in a future attempt (at least, not by 
the same task instance), and the offset commit is taking place on the task's 
work thread instead of the source task offset commit thread, so there's no 
worry about squatting on that thread and blocking other tasks from being able 
to commit offsets while it's in use.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to