C0urante commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r583154155
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
synchronized (this) {
// First we need to make sure we snapshot everything in exactly
the current state. This
// means both the current set of messages we're still waiting to
finish, stored in this
- // class, which setting flushing = true will handle by storing any
new values into a new
+ // class, which setting recordFlushPending = true will handle by
storing any new values into a new
// buffer; and the current set of user-specified offsets, stored
in the
// OffsetStorageWriter, for which we can use beginFlush() to
initiate the snapshot.
- flushing = true;
- boolean flushStarted = offsetWriter.beginFlush();
+ // No need to begin a new offset flush if we timed out waiting for
records to be flushed to
+ // Kafka in a prior attempt.
+ if (!recordFlushPending) {
Review comment:
> We end up blocking the event thread anyway because of the need to do
it under the lock.
I think we actually keep polling the task for records during the offset
commit, which is the entire reason we have the `outstandingMessagesBacklog`
field. Without it, we'd just add everything to `outstandingMessages` knowing
that, if we've made it to the point of adding a record to that collection,
we're not in the process of committing offset, right?
Concretely, we can see that the offset thread [relinquishes the lock on the
`WorkerSourceTask` instance while waiting for outstanding messages to be
ack'd](https://github.com/C0urante/kafka/blob/03c5a83a8277fa7c4ec503c3e044ae61cff06eea/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java).
I'm not sure we _need_ to perform offset commits on a separate thread, but
it is in line with what we do for sink tasks, where we [leverage the
`Consumer::commitAsync`
method](https://github.com/apache/kafka/blob/e2a0d0c90e1916d77223a420e3595e8aba643001/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L365).
If we want to consider making offset commit synchronous (which is likely
going to happen anyways when transactional writes for exactly-once source are
introduced), that also might be worth a follow-up. The biggest problem I can
think of with that approach would be that a single offline topic-partition
would block up the entire task thread when it comes time for offset commit. If
we keep the timeout for offset commit, then that'd limit the fallout and allow
us to resume polling new records from the task and dispatching them to the
producer after the commit attempt timed out. However, there'd still be a
non-negligible throughput hit (especially for workers configured with higher
offset timeouts).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]