C0urante commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r741279732



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -474,9 +479,24 @@ public boolean commitOffsets() {
         long timeout = started + commitTimeoutMs;
 
         Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
+        SubmittedRecords.Pending pendingMetadataForCommit;
         synchronized (this) {
             offsetsToCommit = this.committableOffsets;
             this.committableOffsets = new HashMap<>();
+            pendingMetadataForCommit = this.pendingRecordsMetadata;
+            this.pendingRecordsMetadata = null;
+        }
+
+        if (pendingMetadataForCommit != null) {
+            log.info("There are currently {} pending messages spread across {} 
source partitions whose offsets will not be committed. "
+                            + "The source partition with the most pending 
messages is {}, with {} pending messages",
+                    pendingMetadataForCommit.totalPendingMessages(),
+                    pendingMetadataForCommit.numDeques(),
+                    pendingMetadataForCommit.largestDequePartition(),
+                    pendingMetadataForCommit.largestDequeSize()
+            );
+        } else {
+            log.info("There are currently no pending messages for this offset 
commit; all messages since the last commit have been acknowledged");

Review comment:
       👍  &nbsp; SGTM. I've updated the PR accordingly.
   
   One nit: the "flushing <n> outstanding messages for offset commit" message 
actually refers to the number of unacked messages in the current batch; this 
has tripped up many of my colleagues who see "flushing 0 outstanding messages" 
and think their source connector isn't producing any data when all it really 
means is that its producers are keeping up with the throughput of its tasks 
very well.
   
   I think both pieces of information (number of acked and unacked messages) 
are useful here so I've included both in the latest draft.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to