mjsax commented on a change in pull request #9744:
URL: https://github.com/apache/kafka/pull/9744#discussion_r564992729



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -289,4 +291,36 @@ Cancellable schedule(final Duration interval,
      */
     Map<String, Object> appConfigsWithPrefix(final String prefix);
 
+    /**
+     * Return the current system timestamp (also called wall-clock time) in 
milliseconds.
+     *
+     * <p>
+     * Note: this method returns the internally cached system timestamp from 
the Kafka Stream runtime.
+     * Thus, it may return a different value compared to 
`System.currentTimeMillis()`.

Review comment:
       nit:
   ```
   `System.currentTimeMillis()` -> {@code System.currentTimeMillis()} 
   
   or: {link System#currentTimeMillis()}
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##########
@@ -45,7 +45,6 @@
     private boolean initialized;
     protected ProcessorRecordContext recordContext;
     protected ProcessorNode<?, ?, ?, ?> currentNode;
-    private long currentSystemTimeMs;

Review comment:
       @rohitrmd did you see Guozhang comment? Seems we should not remove this 
variable (what is a little unfortunate, but I guess desirable behavior) -- 
maybe we can rename it to `cachedSystemTimeMs`?
   
   I guess we want the caching for the `GlobalProcessorContext`, too? (For this 
case, we would need to update the JavaDocs of the `ProcessorContext` interface 
accordingly.)
   
   We should also add a test for the corresponding contest-test classes that 
they don't advance system time automatically, but only in the advance is 
triggered explicitly.

##########
File path: 
streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
##########
@@ -55,14 +55,9 @@ public MockInternalProcessorContext(final Properties config, 
final TaskId taskId
         super(config, taskId, stateDir);
     }
 
-    @Override
-    public void setSystemTimeMs(long timeMs) {

Review comment:
       I guess a follow up PR works, too.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -289,4 +291,36 @@ Cancellable schedule(final Duration interval,
      */
     Map<String, Object> appConfigsWithPrefix(final String prefix);
 
+    /**
+     * Return the current system timestamp (also called wall-clock time) in 
milliseconds.
+     *
+     * <p>
+     * Note: this method returns the internally cached system timestamp from 
the Kafka Stream runtime.
+     * Thus, it may return a different value compared to 
`System.currentTimeMillis()`.
+     * <p>
+     *
+     * For a global processor, Kafka Streams does not cache system time and 
thus calling this method will return
+     * the same value as `System.currentTimeMillis()`.

Review comment:
       as above

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -289,4 +291,36 @@ Cancellable schedule(final Duration interval,
      */
     Map<String, Object> appConfigsWithPrefix(final String prefix);
 
+    /**
+     * Return the current system timestamp (also called wall-clock time) in 
milliseconds.
+     *
+     * <p>
+     * Note: this method returns the internally cached system timestamp from 
the Kafka Stream runtime.
+     * Thus, it may return a different value compared to 
`System.currentTimeMillis()`.
+     * <p>
+     *
+     * For a global processor, Kafka Streams does not cache system time and 
thus calling this method will return
+     * the same value as `System.currentTimeMillis()`.
+     *
+     * @return the current system timestamp in milliseconds
+     */
+    long currentSystemTimeMs();
+
+    /**
+     * Return the current stream-time in milliseconds.
+     *
+     * <p>
+     * Stream-time is the maximum observed {@link TimestampExtractor record 
timestamp} so far
+     * (including the currently processed record), i.e., it can be considered 
a high-watermark.
+     * Stream-time is tracked on a per-task basis and is preserved across 
restarts and during task migration.
+     * <p>
+     *
+     * Note: this method is not supported for global processors (cf. {@link 
Topology#addGlobalStore} (...)
+     * and {@link StreamsBuilder#addGlobalStore} (...),
+     * because there is no concept of stream-time for this case.
+     * Calling this method in a global processor with result in an {@link 
UnsupportedOperationException}.

Review comment:
       typo: `with` -> `will`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to