C0urante commented on a change in pull request #11323: URL: https://github.com/apache/kafka/pull/11323#discussion_r726483477
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -495,56 +471,25 @@ public boolean commitOffsets() { long started = time.milliseconds(); long timeout = started + commitTimeoutMs; + Map<Map<String, Object>, Map<String, Object>> offsetsToCommit; synchronized (this) { - // First we need to make sure we snapshot everything in exactly the current state. This - // means both the current set of messages we're still waiting to finish, stored in this - // class, which setting flushing = true will handle by storing any new values into a new - // buffer; and the current set of user-specified offsets, stored in the - // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. - flushing = true; - boolean flushStarted = offsetWriter.beginFlush(); - // Still wait for any producer records to flush, even if there aren't any offsets to write - // to persistent storage - - // Next we need to wait for all outstanding messages to finish sending - log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size()); - while (!outstandingMessages.isEmpty()) { - try { - long timeoutMs = timeout - time.milliseconds(); - // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain, - // we can stop flushing immediately - if (isCancelled() || timeoutMs <= 0) { - log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size()); - finishFailedFlush(); - recordCommitFailure(time.milliseconds() - started, null); - return false; - } - this.wait(timeoutMs); - } catch (InterruptedException e) { - // We can get interrupted if we take too long committing when the work thread shutdown is requested, - // requiring a forcible shutdown. Give up since we can't safely commit any offsets, but also need - // to stop immediately - log.error("{} Interrupted while flushing messages, offsets will not be committed", this); - finishFailedFlush(); - recordCommitFailure(time.milliseconds() - started, null); - return false; - } - } + offsetsToCommit = this.committableOffsets; + this.committableOffsets = new HashMap<>(); + } - if (!flushStarted) { - // There was nothing in the offsets to process, but we still waited for the data in the - // buffer to flush. This is useful since this can feed into metrics to monitor, e.g. - // flush time, which can be used for monitoring even if the connector doesn't record any - // offsets. - finishSuccessfulFlush(); - long durationMillis = time.milliseconds() - started; - recordCommitSuccess(durationMillis); - log.debug("{} Finished offset commitOffsets successfully in {} ms", - this, durationMillis); - - commitSourceTask(); - return true; - } + offsetsToCommit.forEach(offsetWriter::offset); + if (!offsetWriter.beginFlush()) { + // There was nothing in the offsets to process, but we still waited for the data in the + // buffer to flush. This is useful since this can feed into metrics to monitor, e.g. Review comment: This is an [existing comment](https://github.com/apache/kafka/blob/34d56dc8d00bd27955eb9bb6ac01d5ae7f134dbd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L535-L538) that was just moved around in this PR. I can update it to remove language that refers to flushing a buffer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org