mjsax commented on a change in pull request #9744: URL: https://github.com/apache/kafka/pull/9744#discussion_r564992729
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ########## @@ -289,4 +291,36 @@ Cancellable schedule(final Duration interval, */ Map<String, Object> appConfigsWithPrefix(final String prefix); + /** + * Return the current system timestamp (also called wall-clock time) in milliseconds. + * + * <p> + * Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. + * Thus, it may return a different value compared to `System.currentTimeMillis()`. Review comment: nit: ``` `System.currentTimeMillis()` -> {@code System.currentTimeMillis()} or: {link System#currentTimeMillis()} ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ########## @@ -45,7 +45,6 @@ private boolean initialized; protected ProcessorRecordContext recordContext; protected ProcessorNode<?, ?, ?, ?> currentNode; - private long currentSystemTimeMs; Review comment: @rohitrmd did you see Guozhang comment? Seems we should not remove this variable (what is a little unfortunate, but I guess desirable behavior) -- maybe we can rename it to `cachedSystemTimeMs`? I guess we want the caching for the `GlobalProcessorContext`, too? (For this case, we would need to update the JavaDocs of the `ProcessorContext` interface accordingly.) We should also add a test for the corresponding contest-test classes that they don't advance system time automatically, but only in the advance is triggered explicitly. ########## File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java ########## @@ -55,14 +55,9 @@ public MockInternalProcessorContext(final Properties config, final TaskId taskId super(config, taskId, stateDir); } - @Override - public void setSystemTimeMs(long timeMs) { Review comment: I guess a follow up PR works, too. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ########## @@ -289,4 +291,36 @@ Cancellable schedule(final Duration interval, */ Map<String, Object> appConfigsWithPrefix(final String prefix); + /** + * Return the current system timestamp (also called wall-clock time) in milliseconds. + * + * <p> + * Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. + * Thus, it may return a different value compared to `System.currentTimeMillis()`. + * <p> + * + * For a global processor, Kafka Streams does not cache system time and thus calling this method will return + * the same value as `System.currentTimeMillis()`. Review comment: as above ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ########## @@ -289,4 +291,36 @@ Cancellable schedule(final Duration interval, */ Map<String, Object> appConfigsWithPrefix(final String prefix); + /** + * Return the current system timestamp (also called wall-clock time) in milliseconds. + * + * <p> + * Note: this method returns the internally cached system timestamp from the Kafka Stream runtime. + * Thus, it may return a different value compared to `System.currentTimeMillis()`. + * <p> + * + * For a global processor, Kafka Streams does not cache system time and thus calling this method will return + * the same value as `System.currentTimeMillis()`. + * + * @return the current system timestamp in milliseconds + */ + long currentSystemTimeMs(); + + /** + * Return the current stream-time in milliseconds. + * + * <p> + * Stream-time is the maximum observed {@link TimestampExtractor record timestamp} so far + * (including the currently processed record), i.e., it can be considered a high-watermark. + * Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration. + * <p> + * + * Note: this method is not supported for global processors (cf. {@link Topology#addGlobalStore} (...) + * and {@link StreamsBuilder#addGlobalStore} (...), + * because there is no concept of stream-time for this case. + * Calling this method in a global processor with result in an {@link UnsupportedOperationException}. Review comment: typo: `with` -> `will` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org