mjsax commented on code in PR #17054:
URL: https://github.com/apache/kafka/pull/17054#discussion_r1741158880


##########
streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java:
##########
@@ -102,4 +106,23 @@ public interface ErrorHandlerContext {
      * @return the task ID
      */
     TaskId taskId();
+
+    /**
+     * Return the current timestamp.
+     *
+     * <p> If it is triggered while processing a record streamed from the 
source processor,
+     * timestamp is defined as the timestamp of the current input record; the 
timestamp is extracted from
+     * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} 
by {@link TimestampExtractor}.
+     * Note, that an upstream {@link 
org.apache.kafka.streams.processor.api.Processor} might have set a new 
timestamp by calling
+     * {@link ProcessorContext#forward(Object, Object, To) forward(..., 
To.all().withTimestamp(...))}.
+     * In particular, some Kafka Streams DSL operators set result record 
timestamps explicitly,
+     * to guarantee deterministic results.
+     *
+     * <p> If it is triggered while processing a record generated not from the 
source processor (for example,
+     * if this method is invoked from the punctuate call), timestamp is 
defined as the current
+     * task's stream time, which is defined as the largest timestamp of any 
record processed by the task.

Review Comment:
   (Did you verify the behavior? I would hope that TTD doe the right/same thing 
as the actual runtime for this case.)
   
   Last, given that `ErrorHandlerContext` in also used for the deserialization 
case, I am wondering if we should say something about this, too? For this case, 
the `TimestampExtractor` was not called yet, and thus we pass in 
`rawRecord.timestamp()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to