hermanjakobsen commented on code in PR #19937: URL: https://github.com/apache/kafka/pull/19937#discussion_r2143176822
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -1680,6 +1747,35 @@ public void shouldPunctuateOnceSystemTimeAfterGap() { processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100, now + 110, now + 122, now + 130, now + 235, now + 240); } + @Test + public void shouldPunctuateUsingAnchoredSystemStartTimeWithStartTimeBeforeNow() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + + final long now = time.milliseconds(); + final long testStartTime = now + (10L - (now % 10L)); // Used to make test deterministic + time.setCurrentTimeMs(testStartTime); + final MockProcessorNode<Integer, Integer, ?, ?> anchoredProcessorSystemTime = new MockProcessorNode<>(Instant.ofEpochMilli(testStartTime - 10), 10L, PunctuationType.WALL_CLOCK_TIME); + task = createStatelessTaskWithAnchoredPunctuation(createConfig("100"), anchoredProcessorSystemTime); + task.initializeIfNeeded(); + task.completeRestoration(noOpResetter -> { }); + + // now is after startTime -> initial punctuation + assertTrue(task.canPunctuateSystemTime()); + assertTrue(task.maybePunctuateSystemTime()); Review Comment: I think I've fixed it with https://github.com/apache/kafka/pull/19937/commits/14cbe4162009c69ee72c0c183b977d8f67d80702 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org