vvcephei commented on a change in pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#discussion_r420377361



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -209,49 +211,52 @@ Cancellable schedule(final Duration interval,
     <K1 extends K, V1 extends V> void forward(final K1 key, final V1 value, 
final String childName);
 
     /**
-     * Requests a commit
+     * Requests a commit.
      */
     void commit();
 
     /**
      * Returns the topic name of the current input record; could be null if it 
is not
-     * available (for example, if this method is invoked from the punctuate 
call)
+     * available (for example, if this method is invoked from the punctuate 
call).
      *
      * @return the topic name
      */
     String topic();
 
     /**
      * Returns the partition id of the current input record; could be -1 if it 
is not
-     * available (for example, if this method is invoked from the punctuate 
call)
+     * available (for example, if this method is invoked from the punctuate 
call).
      *
      * @return the partition id
      */
     int partition();
 
     /**
      * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate 
call)
+     * available (for example, if this method is invoked from the punctuate 
call).
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the headers of the current input record; could be null if it is 
not available
+     * Returns the headers of the current input record; could be null if it is 
not
+     * available (for example, if this method is invoked from the punctuate 
call).
+     *
      * @return the headers
      */
     Headers headers();
 
     /**
      * Returns the current timestamp.
      *
-     * If it is triggered while processing a record streamed from the source 
processor, timestamp is defined as the timestamp of the current input record; 
the timestamp is extracted from
+     * <p> If it is triggered while processing a record streamed from the 
source processor,
+     * timestamp is defined as the timestamp of the current input record; the 
timestamp is extracted from
      * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} 
by {@link TimestampExtractor}.
      *
-     * If it is triggered while processing a record generated not from the 
source processor (for example,
+     * <p> If it is triggered while processing a record generated not from the 
source processor (for example,
      * if this method is invoked from the punctuate call), timestamp is 
defined as the current
-     * task's stream time, which is defined as the smallest among all its 
input stream partition timestamps.
+     * task's stream time, which is defined as the largest among all its input 
stream partition timestamps.

Review comment:
       I just took another look at the definition of streamTime, and it 
actually looks like it might be computed wrongly.
   
   The way it works is that the "stream time" for a task is computed most of 
the time in 
`org.apache.kafka.streams.processor.internals.PartitionGroup#nextRecord`, i.e., 
it's the max timestamp of any record _polled from the PartitionGroup_.
   
   However, when we commit, we commit the "partition time" for each 
TopicPartition, which is set when we move a record into the head position for 
that queue. During restoration, we read these committed timestamps for each 
TopicPartition, and we (incorrectly) set the "stream time" to be the maximum 
over the "partition time" of each partition in the PartitionGroup (aka Task).
   
   This is incorrect in two ways:
   1. it should be the minimum, not the maximum (since we would choose the 
record with the minimum timestamp to process next)
   2. the timestamp of the _head enqueued_ record (partition time) is not the 
timestamp of the _last dequeued_ record (stream time).
   
   I'll file a Jira ticket capturing all this. In the mean time, I'd suggest 
that we just update the docs to reflect the correct definition of "stream 
time": `which is defined as the largest timestamp of any record processed by 
the task`. Then, we can fix the code to make this true all the time. Currently, 
it's only true in steady state, not immediately after restoration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to