C0urante commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r582351074



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly 
the current state. This
             // means both the current set of messages we're still waiting to 
finish, stored in this
-            // class, which setting flushing = true will handle by storing any 
new values into a new
+            // class, which setting recordFlushPending = true will handle by 
storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored 
in the
             // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for 
records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       1. I think it's a necessary evil, since source task offset commits are 
[conducted on a single 
thread](https://github.com/apache/kafka/blob/3f09fb97b6943c0612488dfa8e5eab8078fd7ca0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L64).
 Without a timeout for offset commits, a single task could block indefinitely 
and disable offset commits for all other tasks on the cluster.
   2. This is definitely possible; I think the only saving grace here is that 
the combined sizes of the `outstandingMessages` and 
`outstandingMessagesBacklog` fields is going to be naturally throttled by the 
producer's buffer. If too many records are accumulated, the call to 
`Producer::send` will block synchronously until space is freed up, at which 
point, the worker can continue polling the task for new records. This isn't 
ideal as it will essentially cause the producer's entire buffer to be occupied 
until the throughput of record production from the task decreases and/or the 
write throughput of the producer rises to meet it, but it at least establishes 
an upper bound for how large a single batch of records in the 
`oustandingMessages` field ever gets. It may take several offset commit 
attempts for all of the records in that batch to be ack'd, with all but the 
last (successful) attempt timing out and failing, but forward progress with 
offset commits should still be possible.
   
   I share your feelings about the complexity here. I think ultimately it 
arises from two constraints:
   1. A worker-global producer is used to write source offsets to the internal 
offsets topic right now. Although this doesn't necessarily require the 
single-threaded logic for offset commits mentioned above, things become simpler 
with it.
   2. (Please correct me if I'm wrong on this point; my core knowledge is a 
little fuzzy and maybe there are stronger guarantees than I'm aware of) 
Out-of-order acknowledgment of records makes tracking the latest offset for a 
given source partition a little less trivial than it seems initially. For 
example, if a task produces two records with the same source partition that end 
up being delivered to different topic-partitions, the second record may be 
ack'd before the first, and when it comes time for offset commit, the framework 
would have to refrain from committing offsets for that second record until the 
first is also ack'd.
   
   I don't think either of these points make it impossible to add even 
more-fine-grained offset commit behavior and/or remove offset commit timeouts, 
but the work involved would be a fair amount heavier than this relatively-minor 
patch. If you'd prefer to see something along those lines, could we consider 
merging this patch for the moment and perform a more serious overhaul of the 
source task offset commit logic as a follow-up, possibly with a small design 
discussion on a Jira ticket to make sure there's alignment on the new behavior?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to