ableegoldman commented on code in PR #18115:
URL: https://github.com/apache/kafka/pull/18115#discussion_r1880826110


##########
streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java:
##########
@@ -31,7 +30,9 @@ public AutoOffsetResetInternal(final AutoOffsetReset 
autoOffsetReset) {
     public StrategyType offsetResetStrategy() {
         return offsetResetStrategy;
     }
-    public Optional<Duration> duration() {
-        return duration;
+
+    @SuppressWarnings("all")

Review Comment:
   ...all?? 🧯 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1347,19 +1360,30 @@ private <S extends StateStore> InternalTopicConfig 
createChangelogTopicConfig(fi
     }
 
     public boolean hasOffsetResetOverrides() {
-        return !(earliestResetTopics.isEmpty() && 
earliestResetPatterns.isEmpty()
-            && latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
+        return noneResetTopics.size() + noneResetPatterns.size()

Review Comment:
   out of curiosity why move away from the `isEmpty && isEmpty` pattern?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1347,19 +1360,30 @@ private <S extends StateStore> InternalTopicConfig 
createChangelogTopicConfig(fi
     }
 
     public boolean hasOffsetResetOverrides() {
-        return !(earliestResetTopics.isEmpty() && 
earliestResetPatterns.isEmpty()
-            && latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
+        return noneResetTopics.size() + noneResetPatterns.size()
+            + earliestResetTopics.size() + earliestResetPatterns.size()
+            + latestResetTopics.size() + latestResetPatterns.size()
+            + durationResetTopics.size() + durationResetPatterns.size() > 0;
     }
 
     public AutoOffsetResetStrategy offsetResetStrategy(final String topic) {
-        if 
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
+        Optional<Duration> resetDuration;
+
+        if (maybeDecorateInternalSourceTopics(noneResetTopics).contains(topic) 
||
+            noneResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
+            return AutoOffsetResetStrategy.NONE;
+        } else if 
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
             earliestResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
             return AutoOffsetResetStrategy.EARLIEST;
         } else if 
(maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) ||
             latestResetPatterns.stream().anyMatch(p -> 
p.matcher(topic).matches())) {
             return AutoOffsetResetStrategy.LATEST;
+        } else if 
(maybeDecorateInternalSourceTopics(durationResetTopics.keySet()).contains(topic))
 {
+            return AutoOffsetResetStrategy.fromString("by_duration:" + 
durationResetTopics.get(topic).toString());
+        } else if ((resetDuration = findDuration(topic)).isPresent()) {
+            return AutoOffsetResetStrategy.fromString("by_duration:" + 
resetDuration.get());
         } else if (containsTopic(topic)) {
-            return AutoOffsetResetStrategy.NONE;
+            return null;

Review Comment:
   why change this from `NONE` to `null`?
   
   IIUC this is the reason you then had to change the 
`TopologyMetadata#offsetResetStrategy`'s return type to 
`Optional<AutoOffsetResetStrategy>`, which imo made 
[this](https://github.com/apache/kafka/pull/18115/files#diff-76f629d0df8bd30b2593cbcf4a2dc80de3167ebf55ef8b5558e6e6285a057496R1304)
 part of the StreamThread code much harder to follow than necessary. Don't like 
mixing Optional.empty and null



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1291,29 +1293,77 @@ private void resetOffsets(final Set<TopicPartition> 
partitions, final Exception
         final Set<String> loggedTopics = new HashSet<>();
         final Set<TopicPartition> seekToBeginning = new HashSet<>();
         final Set<TopicPartition> seekToEnd = new HashSet<>();
+        final Map<TopicPartition, Duration> seekByDuration = new HashMap<>();
         final Set<TopicPartition> notReset = new HashSet<>();
 
         for (final TopicPartition partition : partitions) {
-            final AutoOffsetResetStrategy offsetResetStrategy = 
topologyMetadata.offsetResetStrategy(partition.topic());
+            final Optional<AutoOffsetResetStrategy> offsetResetStrategy = 
topologyMetadata.offsetResetStrategy(partition.topic());
 
             // This may be null if the task we are currently processing was 
apart of a named topology that was just removed.
             // TODO KAFKA-13713: keep the StreamThreads and TopologyMetadata 
view of named topologies in sync until final thread has acked

Review Comment:
    can we update this TODO to make it about removing the null check once we 
remove named topologies



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1326,6 +1376,50 @@ private void resetOffsets(final Set<TopicPartition> 
partitions, final Exception
             if (!seekToEnd.isEmpty()) {
                 mainConsumer.seekToEnd(seekToEnd);
             }
+
+            if (!seekByDuration.isEmpty()) {
+                final long nowMs = time.milliseconds();
+                final Map<TopicPartition, Long> seekToTimestamps = 
seekByDuration.entrySet().stream()
+                    .map(e -> {
+                        long seekMs = nowMs - e.getValue().toMillis();
+                        if (seekMs < 0L) {
+                            log.debug("Cannot reset offset to negative 
timestamp {} for partition {}. Seeking to timestamp 0 instead.", seekMs, 
e.getKey());
+                            seekMs = 0L;
+                        }
+                        return Map.entry(e.getKey(), seekMs);
+                    })
+                    .collect(HashMap::new, (m, e) -> m.put(e.getKey(), 
e.getValue()), Map::putAll);
+
+                try {
+                    for (final Map.Entry<TopicPartition, OffsetAndTimestamp> 
partitionAndOffset : mainConsumer.offsetsForTimes(seekToTimestamps).entrySet()) 
{
+                        final TopicPartition partition = 
partitionAndOffset.getKey();
+                        final OffsetAndTimestamp seekOffset = 
partitionAndOffset.getValue();
+                        if (seekOffset != null) {
+                            mainConsumer.seek(partition, new 
OffsetAndMetadata(seekOffset.offset()));
+                        } else {
+                            log.debug(
+                                "Cannot reset offset to non-existing timestamp 
{} (larger than timestamp of last record)" +
+                                    " for partition {}. Seeking to end 
instead.",
+                                seekToTimestamps.get(partition),
+                                partition
+                            );
+                            
mainConsumer.seekToEnd(Collections.singleton(partitionAndOffset.getKey()));
+                        }
+                    }
+                } catch (final TimeoutException timeoutException) {
+                    for (final TopicPartition partition : 
seekByDuration.keySet()) {
+                        final Task task = taskManager.getActiveTask(partition);
+                        task.maybeInitTaskTimeoutOrThrow(now, 
timeoutException);
+                        stateUpdater.add(task);

Review Comment:
   nit: i don't love making `#getActiveTask` non-private like this, can we just 
wrap these calls in a new TaskManager API



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1347,19 +1360,30 @@ private <S extends StateStore> InternalTopicConfig 
createChangelogTopicConfig(fi
     }
 
     public boolean hasOffsetResetOverrides() {
-        return !(earliestResetTopics.isEmpty() && 
earliestResetPatterns.isEmpty()
-            && latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
+        return noneResetTopics.size() + noneResetPatterns.size()
+            + earliestResetTopics.size() + earliestResetPatterns.size()
+            + latestResetTopics.size() + latestResetPatterns.size()
+            + durationResetTopics.size() + durationResetPatterns.size() > 0;
     }
 
     public AutoOffsetResetStrategy offsetResetStrategy(final String topic) {
-        if 
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
+        Optional<Duration> resetDuration;

Review Comment:
   github/checkstyle is saying this should be final 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to