C0urante commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r589603947



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
         synchronized (this) {
             // First we need to make sure we snapshot everything in exactly 
the current state. This
             // means both the current set of messages we're still waiting to 
finish, stored in this
-            // class, which setting flushing = true will handle by storing any 
new values into a new
+            // class, which setting recordFlushPending = true will handle by 
storing any new values into a new
             // buffer; and the current set of user-specified offsets, stored 
in the
             // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
+            // No need to begin a new offset flush if we timed out waiting for 
records to be flushed to
+            // Kafka in a prior attempt.
+            if (!recordFlushPending) {

Review comment:
       I've been ruminating over this for a few days and I think it should be 
possible to make task offset commits independent of each other by changing the 
source task offset commit scheduler to use a multi-threaded executor instead of 
a global single-threaded executor for all tasks. This isn't quite the same 
thing as what you're proposing since tasks would still not be responsible for 
waiting for flush completion (the offset scheduler's threads would be), but 
it's a smaller change and as far as I can tell, the potential downsides only 
really amount to a few extra threads being created.
   
   The usage of 
[`scheduleWithFixedDelay`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html#scheduleWithFixedDelay-java.lang.Runnable-long-long-java.util.concurrent.TimeUnit-)
 already ensures that two offset commits for the same task won't be active at 
the same time, as it "Creates and executes a periodic action that becomes 
enabled first after the given initial delay, and subsequently with the given 
delay between the termination of one execution and the commencement of the 
next."
   
   Beyond that, the only concern that comes to mind is potential races caused 
by concurrent access of the offset backing store and its underlying resources.
   
   In distributed mode, the `KafkaOffsetBackingStore` and its usage of the 
underlying `KafkaBasedLog` appear to be thread-safe as everything basically 
boils down to calls to `Producer::send`, which should be fine.
   
   In standalone mode, the `MemoryOffsetBackingStore` handles all writes/reads 
of the local offsets file via a single-threaded executor, so concurrent calls 
to `MemoryOffsetBackingStore::set` should also be fine.
   
   
   Granted, none of this addresses your original concern, which is whether an 
offset commit timeout is necessary at all. In response to that, I think we may 
also want to revisit the offset commit logic and possibly do away with a 
timeout altogether. In sink tasks, for example, offset commit timeouts are 
almost a cosmetic feature at this point and are really only useful for metrics 
tracking. However, at the moment it's actually been pretty useful to us to 
monitor source task offset commit success/failure JMX metrics as a means of 
tracking overall task health. We might be able to make up the difference by 
relying on metrics for the number of active records, but it's probably not safe 
to make that assumption for all users, especially for what is intended to be a 
bug fix. So, if possible, I'd like to leave a lot of the offset commit logic 
intact as it is for the moment and try to keep the changes here minimal.
   
   
   To summarize: I'd like to proceed by keeping the currently-proposed changes, 
and changing the source task offset committer to use a multi-threaded executor 
instead of a single-threaded executor. I can file a follow-up ticket to track 
improvements in offset commit logic (definitely for source tasks, and possibly 
for sinks) and we can look into that if it becomes a problem in the future. 
What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to