mjsax commented on code in PR #18115:
URL: https://github.com/apache/kafka/pull/18115#discussion_r1880967165


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1326,6 +1376,50 @@ private void resetOffsets(final Set<TopicPartition> 
partitions, final Exception
             if (!seekToEnd.isEmpty()) {
                 mainConsumer.seekToEnd(seekToEnd);
             }
+
+            if (!seekByDuration.isEmpty()) {
+                final long nowMs = time.milliseconds();
+                final Map<TopicPartition, Long> seekToTimestamps = 
seekByDuration.entrySet().stream()
+                    .map(e -> {
+                        long seekMs = nowMs - e.getValue().toMillis();
+                        if (seekMs < 0L) {
+                            log.debug("Cannot reset offset to negative 
timestamp {} for partition {}. Seeking to timestamp 0 instead.", seekMs, 
e.getKey());
+                            seekMs = 0L;
+                        }
+                        return Map.entry(e.getKey(), seekMs);
+                    })
+                    .collect(HashMap::new, (m, e) -> m.put(e.getKey(), 
e.getValue()), Map::putAll);
+
+                try {
+                    for (final Map.Entry<TopicPartition, OffsetAndTimestamp> 
partitionAndOffset : mainConsumer.offsetsForTimes(seekToTimestamps).entrySet()) 
{
+                        final TopicPartition partition = 
partitionAndOffset.getKey();
+                        final OffsetAndTimestamp seekOffset = 
partitionAndOffset.getValue();
+                        if (seekOffset != null) {
+                            mainConsumer.seek(partition, new 
OffsetAndMetadata(seekOffset.offset()));
+                        } else {
+                            log.debug(
+                                "Cannot reset offset to non-existing timestamp 
{} (larger than timestamp of last record)" +
+                                    " for partition {}. Seeking to end 
instead.",
+                                seekToTimestamps.get(partition),
+                                partition
+                            );
+                            
mainConsumer.seekToEnd(Collections.singleton(partitionAndOffset.getKey()));
+                        }
+                    }
+                } catch (final TimeoutException timeoutException) {
+                    for (final TopicPartition partition : 
seekByDuration.keySet()) {
+                        final Task task = taskManager.getActiveTask(partition);
+                        task.maybeInitTaskTimeoutOrThrow(now, 
timeoutException);
+                        stateUpdater.add(task);

Review Comment:
   Happy to take a look -- can we do this in a follow up PR so we can merge 
this PR on-time?
   
   I assume it could imply larger refactoring what would not belong into this 
PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to