C0urante commented on a change in pull request #11524: URL: https://github.com/apache/kafka/pull/11524#discussion_r754528381
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -260,6 +259,10 @@ public void execute() { } catch (InterruptedException e) { // Ignore and allow to exit. } finally { + submittedRecords.awaitAllMessages( + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG), Review comment: Waiting for up to `offset.flush.timeout.ms` milliseconds here may cause shutdown to block for double that time (since the subsequent call to `WorkerSourceTask::commitOffsets` may also block for `offset.flush.timeout.ms` milliseconds while flushing offset information to the persistent backing store). I initially considered an approach where we would require the entire combination of (awaiting in-flight messages, computing committable offsets, committing offsets to backing store) to complete in `offset.flush.timeout.ms` milliseconds, but this came with the unfortunate drawback that, if any in-flight record were undeliverable within the flush timeout, it would prevent any offsets from being committed, which seems counter to the original motivation of the changes we made for [KAFKA-12226](https://issues.apache.org/jira/browse/KAFKA-12226) in https://github.com/apache/kafka/pull/11323. We could also consider blocking for at most half of `offset.flush.timeout.ms` here and then blocking for the remaining time in `commitOffsets` if we want to honor the docstring for `offset.flush.timeout.ms` as strictly as possible: > Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt. But I don't know if that's really necessary since in this case, these offsets have no chance of being committed in a future attempt (at least, not by the same task instance), and the offset commit is taking place on the task's work thread instead of the source task offset commit thread, so there's no worry about squatting on that thread and blocking other tasks from being able to commit offsets while it's in use. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org