ableegoldman commented on code in PR #18115:
URL: https://github.com/apache/kafka/pull/18115#discussion_r1880826110
##########
streams/src/main/java/org/apache/kafka/streams/internals/AutoOffsetResetInternal.java:
##########
@@ -31,7 +30,9 @@ public AutoOffsetResetInternal(final AutoOffsetReset
autoOffsetReset) {
public StrategyType offsetResetStrategy() {
return offsetResetStrategy;
}
- public Optional<Duration> duration() {
- return duration;
+
+ @SuppressWarnings("all")
Review Comment:
...all?? 🧯
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1347,19 +1360,30 @@ private <S extends StateStore> InternalTopicConfig
createChangelogTopicConfig(fi
}
public boolean hasOffsetResetOverrides() {
- return !(earliestResetTopics.isEmpty() &&
earliestResetPatterns.isEmpty()
- && latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
+ return noneResetTopics.size() + noneResetPatterns.size()
Review Comment:
out of curiosity why move away from the `isEmpty && isEmpty` pattern?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1347,19 +1360,30 @@ private <S extends StateStore> InternalTopicConfig
createChangelogTopicConfig(fi
}
public boolean hasOffsetResetOverrides() {
- return !(earliestResetTopics.isEmpty() &&
earliestResetPatterns.isEmpty()
- && latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
+ return noneResetTopics.size() + noneResetPatterns.size()
+ + earliestResetTopics.size() + earliestResetPatterns.size()
+ + latestResetTopics.size() + latestResetPatterns.size()
+ + durationResetTopics.size() + durationResetPatterns.size() > 0;
}
public AutoOffsetResetStrategy offsetResetStrategy(final String topic) {
- if
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
+ Optional<Duration> resetDuration;
+
+ if (maybeDecorateInternalSourceTopics(noneResetTopics).contains(topic)
||
+ noneResetPatterns.stream().anyMatch(p ->
p.matcher(topic).matches())) {
+ return AutoOffsetResetStrategy.NONE;
+ } else if
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
earliestResetPatterns.stream().anyMatch(p ->
p.matcher(topic).matches())) {
return AutoOffsetResetStrategy.EARLIEST;
} else if
(maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) ||
latestResetPatterns.stream().anyMatch(p ->
p.matcher(topic).matches())) {
return AutoOffsetResetStrategy.LATEST;
+ } else if
(maybeDecorateInternalSourceTopics(durationResetTopics.keySet()).contains(topic))
{
+ return AutoOffsetResetStrategy.fromString("by_duration:" +
durationResetTopics.get(topic).toString());
+ } else if ((resetDuration = findDuration(topic)).isPresent()) {
+ return AutoOffsetResetStrategy.fromString("by_duration:" +
resetDuration.get());
} else if (containsTopic(topic)) {
- return AutoOffsetResetStrategy.NONE;
+ return null;
Review Comment:
why change this from `NONE` to `null`?
IIUC this is the reason you then had to change the
`TopologyMetadata#offsetResetStrategy`'s return type to
`Optional<AutoOffsetResetStrategy>`, which imo made
[this](https://github.com/apache/kafka/pull/18115/files#diff-76f629d0df8bd30b2593cbcf4a2dc80de3167ebf55ef8b5558e6e6285a057496R1304)
part of the StreamThread code much harder to follow than necessary. Don't like
mixing Optional.empty and null
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1291,29 +1293,77 @@ private void resetOffsets(final Set<TopicPartition>
partitions, final Exception
final Set<String> loggedTopics = new HashSet<>();
final Set<TopicPartition> seekToBeginning = new HashSet<>();
final Set<TopicPartition> seekToEnd = new HashSet<>();
+ final Map<TopicPartition, Duration> seekByDuration = new HashMap<>();
final Set<TopicPartition> notReset = new HashSet<>();
for (final TopicPartition partition : partitions) {
- final AutoOffsetResetStrategy offsetResetStrategy =
topologyMetadata.offsetResetStrategy(partition.topic());
+ final Optional<AutoOffsetResetStrategy> offsetResetStrategy =
topologyMetadata.offsetResetStrategy(partition.topic());
// This may be null if the task we are currently processing was
apart of a named topology that was just removed.
// TODO KAFKA-13713: keep the StreamThreads and TopologyMetadata
view of named topologies in sync until final thread has acked
Review Comment:
can we update this TODO to make it about removing the null check once we
remove named topologies
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1326,6 +1376,50 @@ private void resetOffsets(final Set<TopicPartition>
partitions, final Exception
if (!seekToEnd.isEmpty()) {
mainConsumer.seekToEnd(seekToEnd);
}
+
+ if (!seekByDuration.isEmpty()) {
+ final long nowMs = time.milliseconds();
+ final Map<TopicPartition, Long> seekToTimestamps =
seekByDuration.entrySet().stream()
+ .map(e -> {
+ long seekMs = nowMs - e.getValue().toMillis();
+ if (seekMs < 0L) {
+ log.debug("Cannot reset offset to negative
timestamp {} for partition {}. Seeking to timestamp 0 instead.", seekMs,
e.getKey());
+ seekMs = 0L;
+ }
+ return Map.entry(e.getKey(), seekMs);
+ })
+ .collect(HashMap::new, (m, e) -> m.put(e.getKey(),
e.getValue()), Map::putAll);
+
+ try {
+ for (final Map.Entry<TopicPartition, OffsetAndTimestamp>
partitionAndOffset : mainConsumer.offsetsForTimes(seekToTimestamps).entrySet())
{
+ final TopicPartition partition =
partitionAndOffset.getKey();
+ final OffsetAndTimestamp seekOffset =
partitionAndOffset.getValue();
+ if (seekOffset != null) {
+ mainConsumer.seek(partition, new
OffsetAndMetadata(seekOffset.offset()));
+ } else {
+ log.debug(
+ "Cannot reset offset to non-existing timestamp
{} (larger than timestamp of last record)" +
+ " for partition {}. Seeking to end
instead.",
+ seekToTimestamps.get(partition),
+ partition
+ );
+
mainConsumer.seekToEnd(Collections.singleton(partitionAndOffset.getKey()));
+ }
+ }
+ } catch (final TimeoutException timeoutException) {
+ for (final TopicPartition partition :
seekByDuration.keySet()) {
+ final Task task = taskManager.getActiveTask(partition);
+ task.maybeInitTaskTimeoutOrThrow(now,
timeoutException);
+ stateUpdater.add(task);
Review Comment:
nit: i don't love making `#getActiveTask` non-private like this, can we just
wrap these calls in a new TaskManager API
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -1347,19 +1360,30 @@ private <S extends StateStore> InternalTopicConfig
createChangelogTopicConfig(fi
}
public boolean hasOffsetResetOverrides() {
- return !(earliestResetTopics.isEmpty() &&
earliestResetPatterns.isEmpty()
- && latestResetTopics.isEmpty() && latestResetPatterns.isEmpty());
+ return noneResetTopics.size() + noneResetPatterns.size()
+ + earliestResetTopics.size() + earliestResetPatterns.size()
+ + latestResetTopics.size() + latestResetPatterns.size()
+ + durationResetTopics.size() + durationResetPatterns.size() > 0;
}
public AutoOffsetResetStrategy offsetResetStrategy(final String topic) {
- if
(maybeDecorateInternalSourceTopics(earliestResetTopics).contains(topic) ||
+ Optional<Duration> resetDuration;
Review Comment:
github/checkstyle is saying this should be final
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]