nicktelford commented on code in PR #20954:
URL: https://github.com/apache/kafka/pull/20954#discussion_r2775569731
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -309,6 +311,46 @@ private void closeStartupTasks(final Predicate<Task>
predicate) {
}
}
+ public Map<TaskId, Long> taskOffsetSums(final Set<TaskId> tasks) {
+ return taskOffsetSums.entrySet().stream()
+ .filter(e -> tasks.contains(e.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ }
+
+ public void updateTaskOffsets(final TaskId taskId, final
Map<TopicPartition, Long> changelogOffsets) {
+ if (changelogOffsets.isEmpty()) {
+ return;
+ }
+
+ taskOffsetSums.put(taskId, sumOfChangelogOffsets(taskId,
changelogOffsets));
+ }
+
+ public void removeTaskOffsets(final TaskId taskId) {
+ taskOffsetSums.remove(taskId);
+ }
+
+ private long sumOfChangelogOffsets(final TaskId taskId, final
Map<TopicPartition, Long> changelogOffsets) {
+ long offsetSum = 0L;
+ for (final Map.Entry<TopicPartition, Long> changelogEntry :
changelogOffsets.entrySet()) {
+ final long offset = changelogEntry.getValue();
+
+ if (offset != OffsetCheckpoint.OFFSET_UNKNOWN) {
+ if (offset < 0) {
+ throw new StreamsException(
+ new IllegalStateException("Expected not to get a
sentinel offset, but got: " + changelogEntry),
+ taskId);
+ }
+ offsetSum += offset;
+ if (offsetSum < 0) {
Review Comment:
This whole `sumOfChangelogOffsets` method was transplanted over from
`TaskManager`, and this logic has remained unchanged, so I can't speak to the
reasoning for it.
My guess is that if `offset` is somehow negative, then `offsetSum` could
reduce below `0`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]