C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r726483477



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -495,56 +471,25 @@ public boolean commitOffsets() {
         long started = time.milliseconds();
         long timeout = started + commitTimeoutMs;
 
+        Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
         synchronized (this) {
-            // First we need to make sure we snapshot everything in exactly 
the current state. This
-            // means both the current set of messages we're still waiting to 
finish, stored in this
-            // class, which setting flushing = true will handle by storing any 
new values into a new
-            // buffer; and the current set of user-specified offsets, stored 
in the
-            // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
-            // Still wait for any producer records to flush, even if there 
aren't any offsets to write
-            // to persistent storage
-
-            // Next we need to wait for all outstanding messages to finish 
sending
-            log.info("{} flushing {} outstanding messages for offset commit", 
this, outstandingMessages.size());
-            while (!outstandingMessages.isEmpty()) {
-                try {
-                    long timeoutMs = timeout - time.milliseconds();
-                    // If the task has been cancelled, no more records will be 
sent from the producer; in that case, if any outstanding messages remain,
-                    // we can stop flushing immediately
-                    if (isCancelled() || timeoutMs <= 0) {
-                        log.error("{} Failed to flush, timed out while waiting 
for producer to flush outstanding {} messages", this, 
outstandingMessages.size());
-                        finishFailedFlush();
-                        recordCommitFailure(time.milliseconds() - started, 
null);
-                        return false;
-                    }
-                    this.wait(timeoutMs);
-                } catch (InterruptedException e) {
-                    // We can get interrupted if we take too long committing 
when the work thread shutdown is requested,
-                    // requiring a forcible shutdown. Give up since we can't 
safely commit any offsets, but also need
-                    // to stop immediately
-                    log.error("{} Interrupted while flushing messages, offsets 
will not be committed", this);
-                    finishFailedFlush();
-                    recordCommitFailure(time.milliseconds() - started, null);
-                    return false;
-                }
-            }
+            offsetsToCommit = this.committableOffsets;
+            this.committableOffsets = new HashMap<>();
+        }
 
-            if (!flushStarted) {
-                // There was nothing in the offsets to process, but we still 
waited for the data in the
-                // buffer to flush. This is useful since this can feed into 
metrics to monitor, e.g.
-                // flush time, which can be used for monitoring even if the 
connector doesn't record any
-                // offsets.
-                finishSuccessfulFlush();
-                long durationMillis = time.milliseconds() - started;
-                recordCommitSuccess(durationMillis);
-                log.debug("{} Finished offset commitOffsets successfully in {} 
ms",
-                        this, durationMillis);
-
-                commitSourceTask();
-                return true;
-            }
+        offsetsToCommit.forEach(offsetWriter::offset);
+        if (!offsetWriter.beginFlush()) {
+            // There was nothing in the offsets to process, but we still 
waited for the data in the
+            // buffer to flush. This is useful since this can feed into 
metrics to monitor, e.g.

Review comment:
       This is an [existing 
comment](https://github.com/apache/kafka/blob/34d56dc8d00bd27955eb9bb6ac01d5ae7f134dbd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L535-L538)
 that was just moved around in this PR. I can update it to remove language that 
refers to flushing a buffer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to