rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r740478689



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -474,9 +479,24 @@ public boolean commitOffsets() {
         long timeout = started + commitTimeoutMs;
 
         Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
+        SubmittedRecords.Pending pendingMetadataForCommit;
         synchronized (this) {
             offsetsToCommit = this.committableOffsets;
             this.committableOffsets = new HashMap<>();
+            pendingMetadataForCommit = this.pendingRecordsMetadata;
+            this.pendingRecordsMetadata = null;
+        }
+
+        if (pendingMetadataForCommit != null) {
+            log.info("There are currently {} pending messages spread across {} 
source partitions whose offsets will not be committed. "
+                            + "The source partition with the most pending 
messages is {}, with {} pending messages",
+                    pendingMetadataForCommit.totalPendingMessages(),
+                    pendingMetadataForCommit.numDeques(),
+                    pendingMetadataForCommit.largestDequePartition(),
+                    pendingMetadataForCommit.largestDequeSize()
+            );
+        } else {
+            log.info("There are currently no pending messages for this offset 
commit; all messages since the last commit have been acknowledged");

Review comment:
       As you point out, the old log message was:
   ```
    log.info("{} flushing {} outstanding messages for offset commit", this, 
outstandingMessages.size());
   ```
   This log message had two things it'd be nice to keep:
   1. `this` as the context; and
   2. the number of records whose offsets were being committed (e.g., the 
number of acked records).
   
   I think both would be good to include, especially if we're saying the number 
of records whose offsets are _not_ being committed (yet).
   
   The `Pending` class seems pretty useful, but computing the number of acked 
records is not possible here. WDYT about merging the 
`SumittedRecords.committableOffsets()` and `pending()` methods, by having the 
former return an object that contains the offset map _and_ the metadata that 
can be used for logging? This class would be like `Pending`, though maybe 
`CommittableOffsets` is a more apt name. Plus, `WorkerSourceTask` would only 
have one volatile field that is updated atomically.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to