kkonstantine commented on a change in pull request #10563:
URL: https://github.com/apache/kafka/pull/10563#discussion_r669274974



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -367,42 +369,53 @@ private void doCommit(Map<TopicPartition, 
OffsetAndMetadata> offsets, boolean cl
     }
 
     private void commitOffsets(long now, boolean closing) {
+        commitOffsets(now, closing, consumer.assignment());
+    }
+
+    private void commitOffsets(long now, boolean closing, 
Collection<TopicPartition> topicPartitions) {
         if (workerErrantRecordReporter != null) {
-            log.trace("Awaiting all reported errors to be completed");
-            workerErrantRecordReporter.awaitAllFutures();
-            log.trace("Completed all reported errors");
+            log.trace("Awaiting reported errors for {} to be completed", 
topicPartitions);

Review comment:
       I wonder if we want to print the actual list of partitions here, which 
might be long. And do it twice. 
   I see the same pattern is applied elsewhere. I understand the value of 
explicit listing. 
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -412,32 +425,36 @@ private void commitOffsets(long now, boolean closing) {
             return;
         }
 
-        final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new 
HashMap<>(lastCommittedOffsets);
+        Collection<TopicPartition> allAssignedTopicPartitions = 
consumer.assignment();
+        final Map<TopicPartition, OffsetAndMetadata> committableOffsets = new 
HashMap<>(lastCommittedOffsets);
         for (Map.Entry<TopicPartition, OffsetAndMetadata> 
taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
             final TopicPartition partition = taskProvidedOffsetEntry.getKey();
             final OffsetAndMetadata taskProvidedOffset = 
taskProvidedOffsetEntry.getValue();
-            if (commitableOffsets.containsKey(partition)) {
+            if (committableOffsets.containsKey(partition)) {
                 long taskOffset = taskProvidedOffset.offset();
-                long currentOffset = currentOffsets.get(partition).offset();
+                long currentOffset = offsetsToCommit.get(partition).offset();
                 if (taskOffset <= currentOffset) {
-                    commitableOffsets.put(partition, taskProvidedOffset);
+                    committableOffsets.put(partition, taskProvidedOffset);
                 } else {
                     log.warn("{} Ignoring invalid task provided offset {}/{} 
-- not yet consumed, taskOffset={} currentOffset={}",
-                            this, partition, taskProvidedOffset, taskOffset, 
currentOffset);
+                        this, partition, taskProvidedOffset, taskOffset, 
currentOffset);
                 }
-            } else {
+            } else if (!allAssignedTopicPartitions.contains(partition)) {
                 log.warn("{} Ignoring invalid task provided offset {}/{} -- 
partition not assigned, assignment={}",
-                        this, partition, taskProvidedOffset, 
consumer.assignment());
+                        this, partition, taskProvidedOffset, 
allAssignedTopicPartitions);
+            } else {
+                log.debug("{} Ignoring task provided offset {}/{} -- topic 
partition not requested, requested={}",

Review comment:
       what's a requested topic partition?
   Also, above we mention just `partition`

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -710,22 +758,35 @@ else if (!context.pausedPartitions().isEmpty())
 
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) 
{
+            onPartitionsRemoved(partitions, false);
+        }
+
+        @Override
+        public void onPartitionsLost(Collection<TopicPartition> partitions) {
+            onPartitionsRemoved(partitions, true);
+        }
+
+        private void onPartitionsRemoved(Collection<TopicPartition> 
partitions, boolean lost) {
             if (taskStopped) {
                 log.trace("Skipping partition revocation callback as task has 
already been stopped");
                 return;
             }
-            log.debug("{} Partitions revoked", WorkerSinkTask.this);
+            log.debug("{} Partitions {}: {}", WorkerSinkTask.this, lost ? 
"lost" : "revoked", partitions);
+
+            if (partitions.isEmpty())
+                return;
+
             try {
-                closePartitions();
-                sinkTaskMetricsGroup.clearOffsets();
+                closePartitions(partitions, lost);
+                sinkTaskMetricsGroup.clearOffsets(partitions);
             } catch (RuntimeException e) {
                 // The consumer swallows exceptions raised in the rebalance 
listener, so we need to store
                 // exceptions and rethrow when poll() returns.
                 rebalanceException = e;
             }
 
-            // Make sure we don't have any leftover data since offsets will be 
reset to committed positions
-            messageBatch.clear();
+            // Make sure we don't have any leftover data since offsets for 
these partitions will be reset to committed positions
+            messageBatch.removeIf(record -> partitions.contains(new 
TopicPartition(record.topic(), record.kafkaPartition())));

Review comment:
       do you worry that this became more expensive now, especially in cases 
where we want to remove everything as before?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -367,42 +369,53 @@ private void doCommit(Map<TopicPartition, 
OffsetAndMetadata> offsets, boolean cl
     }
 
     private void commitOffsets(long now, boolean closing) {
+        commitOffsets(now, closing, consumer.assignment());
+    }
+
+    private void commitOffsets(long now, boolean closing, 
Collection<TopicPartition> topicPartitions) {
         if (workerErrantRecordReporter != null) {
-            log.trace("Awaiting all reported errors to be completed");
-            workerErrantRecordReporter.awaitAllFutures();
-            log.trace("Completed all reported errors");
+            log.trace("Awaiting reported errors for {} to be completed", 
topicPartitions);
+            workerErrantRecordReporter.awaitFutures(topicPartitions);
+            log.trace("Completed all reported errors for {}", topicPartitions);
         }
 
-        if (currentOffsets.isEmpty())
+        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
currentOffsets.entrySet().stream()
+            .filter(e -> topicPartitions.contains(e.getKey()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        if (offsetsToCommit.isEmpty())
             return;
 
         committing = true;
         commitSeqno += 1;
         commitStarted = now;
         sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
 
+        Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets = 
this.lastCommittedOffsets.entrySet().stream()
+            .filter(e -> offsetsToCommit.containsKey(e.getKey()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
         final Map<TopicPartition, OffsetAndMetadata> taskProvidedOffsets;
         try {
-            log.trace("{} Calling task.preCommit with current offsets: {}", 
this, currentOffsets);
-            taskProvidedOffsets = task.preCommit(new 
HashMap<>(currentOffsets));
+            log.trace("{} Calling task.preCommit with current offsets: {}", 
this, offsetsToCommit);
+            taskProvidedOffsets = task.preCommit(new 
HashMap<>(offsetsToCommit));
         } catch (Throwable t) {
             if (closing) {
                 log.warn("{} Offset commit failed during close", this);
-                onCommitCompleted(t, commitSeqno, null);
             } else {
                 log.error("{} Offset commit failed, rewinding to last 
committed offsets", this, t);
                 for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
lastCommittedOffsets.entrySet()) {
                     log.debug("{} Rewinding topic partition {} to offset {}", 
this, entry.getKey(), entry.getValue().offset());
                     consumer.seek(entry.getKey(), entry.getValue().offset());
                 }
-                currentOffsets = new HashMap<>(lastCommittedOffsets);

Review comment:
       this change means we never create a fresh copy of the offsets map. 
   Is this correct? Is there a risk for the offsets to keep being added and 
never removed?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -367,42 +369,53 @@ private void doCommit(Map<TopicPartition, 
OffsetAndMetadata> offsets, boolean cl
     }
 
     private void commitOffsets(long now, boolean closing) {
+        commitOffsets(now, closing, consumer.assignment());
+    }
+
+    private void commitOffsets(long now, boolean closing, 
Collection<TopicPartition> topicPartitions) {
         if (workerErrantRecordReporter != null) {
-            log.trace("Awaiting all reported errors to be completed");
-            workerErrantRecordReporter.awaitAllFutures();
-            log.trace("Completed all reported errors");
+            log.trace("Awaiting reported errors for {} to be completed", 
topicPartitions);
+            workerErrantRecordReporter.awaitFutures(topicPartitions);
+            log.trace("Completed all reported errors for {}", topicPartitions);
         }
 
-        if (currentOffsets.isEmpty())
+        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
currentOffsets.entrySet().stream()
+            .filter(e -> topicPartitions.contains(e.getKey()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        if (offsetsToCommit.isEmpty())
             return;
 
         committing = true;
         commitSeqno += 1;
         commitStarted = now;
         sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
 
+        Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets = 
this.lastCommittedOffsets.entrySet().stream()

Review comment:
       shadowing of a member field this way is not ideal. 
   For example there's an older usage of this collection below. I assume it's 
the new local variable we want to use, but I can't be 100% sure. 
   
   Should we call this `actuallyCommittedOffsets` or similar?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -670,8 +707,7 @@ long getNextCommit() {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
             log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
-            lastCommittedOffsets = new HashMap<>();

Review comment:
       similar question as above. This is a map that only grows now. Is this 
correct?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.java
##########
@@ -49,7 +54,7 @@
     private final HeaderConverter headerConverter;
 
     // Visible for testing
-    protected final LinkedList<Future<Void>> futures;
+    protected final Map<TopicPartition, Future<Void>> futures;

Review comment:
       If we want this to be a concurrent map probably good idea to depict this 
in the declaration. 
   (same as you do in the tests below)
   
   ```suggestion
       protected final ConcurrentMap<TopicPartition, Future<Void>> futures;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to