bbejeck commented on code in PR #20954:
URL: https://github.com/apache/kafka/pull/20954#discussion_r2692335841


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -309,6 +311,46 @@ private void closeStartupTasks(final Predicate<Task> 
predicate) {
         }
     }
 
+    public Map<TaskId, Long> taskOffsetSums(final Set<TaskId> tasks) {
+        return taskOffsetSums.entrySet().stream()
+                .filter(e -> tasks.contains(e.getKey()))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+
+    public void updateTaskOffsets(final TaskId taskId, final 
Map<TopicPartition, Long> changelogOffsets) {
+        if (changelogOffsets.isEmpty()) {

Review Comment:
   maybe flip this to `!changelogOffsets.isEmpty()` and move the 
`taskOffsetSums.put` in the body of the `if` statement - IMHO it's easier to 
read



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -309,6 +311,46 @@ private void closeStartupTasks(final Predicate<Task> 
predicate) {
         }
     }
 
+    public Map<TaskId, Long> taskOffsetSums(final Set<TaskId> tasks) {
+        return taskOffsetSums.entrySet().stream()
+                .filter(e -> tasks.contains(e.getKey()))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+
+    public void updateTaskOffsets(final TaskId taskId, final 
Map<TopicPartition, Long> changelogOffsets) {
+        if (changelogOffsets.isEmpty()) {
+            return;
+        }
+
+        taskOffsetSums.put(taskId, sumOfChangelogOffsets(taskId, 
changelogOffsets));
+    }
+
+    public void removeTaskOffsets(final TaskId taskId) {
+        taskOffsetSums.remove(taskId);
+    }
+
+    private long sumOfChangelogOffsets(final TaskId taskId, final 
Map<TopicPartition, Long> changelogOffsets) {
+        long offsetSum = 0L;
+        for (final Map.Entry<TopicPartition, Long> changelogEntry : 
changelogOffsets.entrySet()) {
+            final long offset = changelogEntry.getValue();
+
+            if (offset != OffsetCheckpoint.OFFSET_UNKNOWN) {
+                if (offset < 0) {
+                    throw new StreamsException(
+                            new IllegalStateException("Expected not to get a 
sentinel offset, but got: " + changelogEntry),
+                            taskId);
+                }
+                offsetSum += offset;
+                if (offsetSum < 0) {

Review Comment:
   Is this even possible given above we throw if an offset is less than zero?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -596,6 +639,7 @@ private void cleanRemovedTasksCalledByCleanerThread(final 
long cleanupDelayMs) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = 
taskDir.file().lastModified();
                         if (now - cleanupDelayMs > lastModifiedMs) {
+                            removeTaskOffsets(id);

Review Comment:
   maybe I'm missing something but on line 561 the code calls 
`taskOffsetSums.clear()` but executes `removeTaskOffsets(id)` but the map 
should be empty at this point?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to