C0urante commented on a change in pull request #11323: URL: https://github.com/apache/kafka/pull/11323#discussion_r741279732
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -474,9 +479,24 @@ public boolean commitOffsets() { long timeout = started + commitTimeoutMs; Map<Map<String, Object>, Map<String, Object>> offsetsToCommit; + SubmittedRecords.Pending pendingMetadataForCommit; synchronized (this) { offsetsToCommit = this.committableOffsets; this.committableOffsets = new HashMap<>(); + pendingMetadataForCommit = this.pendingRecordsMetadata; + this.pendingRecordsMetadata = null; + } + + if (pendingMetadataForCommit != null) { + log.info("There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. " + + "The source partition with the most pending messages is {}, with {} pending messages", + pendingMetadataForCommit.totalPendingMessages(), + pendingMetadataForCommit.numDeques(), + pendingMetadataForCommit.largestDequePartition(), + pendingMetadataForCommit.largestDequeSize() + ); + } else { + log.info("There are currently no pending messages for this offset commit; all messages since the last commit have been acknowledged"); Review comment: 👍 SGTM. I've updated the PR accordingly. One nit: the "flushing <n> outstanding messages for offset commit" message actually refers to the number of unacked messages in the current batch and not the number of acknowledged messages for which offsets will be committed; this has tripped up many of my colleagues who see "flushing 0 outstanding messages" and think their source connector isn't producing any data when all it really means is that its producers are keeping up with the throughput of its tasks very well. I think both pieces of information (number of acked and unacked messages) are useful here so I've included both in the latest draft. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org