kpatelatwork commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r614126665



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
##########
@@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) {
     }
 
     private void commit(WorkerSourceTask workerTask) {
+        if (!workerTask.shouldCommitOffsets()) {

Review comment:
       @C0urante  overall the PR looks good but somehow I feel this 
`shouldCommitOffsets` is exposing internal details and may be `commitOffsets` 
should internally encapsulate this.  
   
   How about we move `shouldCommitOffsets` check as first line in the 
`commitOffsets` method. and remove the `log.debug("{} Committing offsets", 
workerTask); `from this method as its redundant and `workerTask .commitOffsets` 
is already doing it.  Your only challenge would be `return false` is in a lot 
of places in `workerTask .commitOffsets` but we can may be convert the current 
method to a private method and add a new commitOffsets wrapper method to make 
that easy. We will also need to move `log.error("{} Failed to commit offsets", 
workerTask);` in the wrapper method.  This way all we need to do is call 
`workerTask.commitOffsets()` in try/catch here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to