This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new f1300c7  KAFKA-8615: Change to track partition time breaks 
TimestampExtractor (#7054)
f1300c7 is described below

commit f1300c71c7ba9d0ef6571949587923675109ba7b
Author: A. Sophie Blee-Goldman <sop...@confluent.io>
AuthorDate: Thu Jul 18 13:54:46 2019 -0700

    KAFKA-8615: Change to track partition time breaks TimestampExtractor (#7054)
    
    The timestamp extractor takes a previousTimestamp parameter which should be 
the partition time. This PR adds back in partition time tracking for the 
extractor, and renames previousTimestamp --> partitionTime
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>, Bill Bejeck 
<bbej...@gmail.com>, Matthias J. Sax <mj...@apache.org>
---
 .../examples/pageview/JsonTimestampExtractor.java  |   2 +-
 .../processor/ExtractRecordMetadataTimestamp.java  |  10 +-
 .../streams/processor/FailOnInvalidTimestamp.java  |   4 +-
 .../processor/LogAndSkipOnInvalidTimestamp.java    |   4 +-
 .../streams/processor/TimestampExtractor.java      |   4 +-
 .../UsePreviousTimeOnInvalidTimestamp.java         |  10 +-
 .../processor/WallclockTimestampExtractor.java     |   4 +-
 .../processor/internals/PartitionGroup.java        |  18 ++--
 .../streams/processor/internals/RecordQueue.java   |  23 ++++-
 .../streams/processor/internals/StreamTask.java    |  10 +-
 .../apache/kafka/streams/StreamsConfigTest.java    |   2 +-
 .../processor/internals/PartitionGroupTest.java    |   8 +-
 .../processor/internals/ProcessorTopologyTest.java |   2 +-
 .../processor/internals/RecordQueueTest.java       | 106 ++++++++++++++++++---
 .../apache/kafka/test/MockTimestampExtractor.java  |   2 +-
 15 files changed, 151 insertions(+), 58 deletions(-)

diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
index 4f6257a..d760183 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java
@@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 public class JsonTimestampExtractor implements TimestampExtractor {
 
     @Override
-    public long extract(final ConsumerRecord<Object, Object> record, final 
long previousTimestamp) {
+    public long extract(final ConsumerRecord<Object, Object> record, final 
long partitionTime) {
         if (record.value() instanceof PageViewTypedDemo.PageView) {
             return ((PageViewTypedDemo.PageView) record.value()).timestamp;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
index 79c8dd3..3c7428a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java
@@ -50,15 +50,15 @@ abstract class ExtractRecordMetadataTimestamp implements 
TimestampExtractor {
      * Extracts the embedded metadata timestamp from the given {@link 
ConsumerRecord}.
      *
      * @param record a data record
-     * @param previousTimestamp the latest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
+     * @param partitionTime the highest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
      * @return the embedded metadata timestamp of the given {@link 
ConsumerRecord}
      */
     @Override
-    public long extract(final ConsumerRecord<Object, Object> record, final 
long previousTimestamp) {
+    public long extract(final ConsumerRecord<Object, Object> record, final 
long partitionTime) {
         final long timestamp = record.timestamp();
 
         if (timestamp < 0) {
-            return onInvalidTimestamp(record, timestamp, previousTimestamp);
+            return onInvalidTimestamp(record, timestamp, partitionTime);
         }
 
         return timestamp;
@@ -69,10 +69,10 @@ abstract class ExtractRecordMetadataTimestamp implements 
TimestampExtractor {
      *
      * @param record a data record
      * @param recordTimestamp the timestamp extractor from the record
-     * @param previousTimestamp the latest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
+     * @param partitionTime the highest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
      * @return a new timestamp for the record (if negative, record will not be 
processed but dropped silently)
      */
     public abstract long onInvalidTimestamp(final ConsumerRecord<Object, 
Object> record,
                                             final long recordTimestamp,
-                                            final long previousTimestamp);
+                                            final long partitionTime);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
index 87cb0de..40d3e0e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java
@@ -54,14 +54,14 @@ public class FailOnInvalidTimestamp extends 
ExtractRecordMetadataTimestamp {
      *
      * @param record a data record
      * @param recordTimestamp the timestamp extractor from the record
-     * @param previousTimestamp the latest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
+     * @param partitionTime the highest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
      * @return nothing; always raises an exception
      * @throws StreamsException on every invocation
      */
     @Override
     public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
                                    final long recordTimestamp,
-                                   final long previousTimestamp)
+                                   final long partitionTime)
             throws StreamsException {
 
         final String message = "Input record " + record + " has invalid 
(negative) timestamp. " +
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java
index 0561e61..b759e5b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java
@@ -56,13 +56,13 @@ public class LogAndSkipOnInvalidTimestamp extends 
ExtractRecordMetadataTimestamp
      *
      * @param record a data record
      * @param recordTimestamp the timestamp extractor from the record
-     * @param previousTimestamp the latest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
+     * @param partitionTime the highest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
      * @return the originally extracted timestamp of the record
      */
     @Override
     public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
                                    final long recordTimestamp,
-                                   final long previousTimestamp) {
+                                   final long partitionTime) {
         log.warn("Input record {} will be dropped because it has an invalid 
(negative) timestamp.", record);
         return recordTimestamp;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
index 0780dc0..1e6d6cd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java
@@ -46,8 +46,8 @@ public interface TimestampExtractor {
      *
      *
      * @param record a data record
-     * @param previousTimestamp the latest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
+     * @param partitionTime the highest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
      * @return the timestamp of the record
      */
-    long extract(ConsumerRecord<Object, Object> record, long 
previousTimestamp);
+    long extract(ConsumerRecord<Object, Object> record, long partitionTime);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java
index dd952cc..89e2fd3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java
@@ -51,20 +51,20 @@ public class UsePreviousTimeOnInvalidTimestamp extends 
ExtractRecordMetadataTime
      *
      * @param record a data record
      * @param recordTimestamp the timestamp extractor from the record
-     * @param previousTimestamp the latest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
+     * @param partitionTime the highest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
      * @return the provided latest extracted valid timestamp as new timestamp 
for the record
      * @throws StreamsException if latest extracted valid timestamp is unknown
      */
     @Override
     public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record,
                                    final long recordTimestamp,
-                                   final long previousTimestamp)
+                                   final long partitionTime)
             throws StreamsException {
-        if (previousTimestamp < 0) {
+        if (partitionTime < 0) {
             throw new StreamsException("Could not infer new timestamp for 
input record " + record
-                    + " because latest extracted valid timestamp is unknown.");
+                    + " because partition time is unknown.");
         }
-        return previousTimestamp;
+        return partitionTime;
     }
 
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
index ad3b3bc..baa1cb6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java
@@ -38,11 +38,11 @@ public class WallclockTimestampExtractor implements 
TimestampExtractor {
      * Return the current wall clock time as timestamp.
      *
      * @param record a data record
-     * @param previousTimestamp the latest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
+     * @param partitionTime the highest extracted valid timestamp of the 
current record's partition˙ (could be -1 if unknown)
      * @return the current wall clock time, expressed in milliseconds since 
midnight, January 1, 1970 UTC
      */
     @Override
-    public long extract(final ConsumerRecord<Object, Object> record, final 
long previousTimestamp) {
+    public long extract(final ConsumerRecord<Object, Object> record, final 
long partitionTime) {
         return System.currentTimeMillis();
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index fbafa73..83b3673 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -41,8 +41,8 @@ import java.util.Set;
  *
  * PartitionGroup also maintains a stream-time for the group as a whole.
  * This is defined as the highest timestamp of any record yet polled from the 
PartitionGroup.
- * The PartitionGroup's stream-time is also the stream-time of its task and is 
used as the
- * stream-time for any computations that require it.
+ * Note however that any computation that depends on stream-time should track 
it on a per-operator basis to obtain an
+ * accurate view of the local time as seen by that processor.
  *
  * The PartitionGroups's stream-time is initially UNKNOWN (-1), and it set to 
a known value upon first poll.
  * As a consequence of the definition, the PartitionGroup's stream-time is 
non-decreasing
@@ -76,7 +76,7 @@ public class PartitionGroup {
     }
 
     PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues, 
final Sensor recordLatenessSensor) {
-        nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::timestamp));
+        nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), 
Comparator.comparingLong(RecordQueue::headRecordTimestamp));
         this.partitionQueues = partitionQueues;
         this.recordLatenessSensor = recordLatenessSensor;
         totalBuffered = 0;
@@ -109,7 +109,7 @@ public class PartitionGroup {
                     nonEmptyQueuesByTime.offer(queue);
                 }
 
-                // always update the stream time to the record's timestamp yet 
to be processed if it is larger
+                // always update the stream-time to the record's timestamp yet 
to be processed if it is larger
                 if (record.timestamp > streamTime) {
                     streamTime = record.timestamp;
                     recordLatenessSensor.record(0);
@@ -140,8 +140,8 @@ public class PartitionGroup {
             nonEmptyQueuesByTime.offer(recordQueue);
 
             // if all partitions now are non-empty, set the flag
-            // we do not need to update the stream time here since this task 
will definitely be
-            // processed next, and hence the stream time will be updated when 
we retrieved records by then
+            // we do not need to update the stream-time here since this task 
will definitely be
+            // processed next, and hence the stream-time will be updated when 
we retrieved records by then
             if (nonEmptyQueuesByTime.size() == this.partitionQueues.size()) {
                 allBuffered = true;
             }
@@ -157,10 +157,9 @@ public class PartitionGroup {
     }
 
     /**
-     * Return the timestamp of this partition group as the smallest
-     * partition timestamp among all its partitions
+     * Return the stream-time of this partition group defined as the largest 
timestamp seen across all partitions
      */
-    public long timestamp() {
+    public long streamTime() {
         return streamTime;
     }
 
@@ -192,6 +191,7 @@ public class PartitionGroup {
 
     public void clear() {
         nonEmptyQueuesByTime.clear();
+        streamTime = RecordQueue.UNKNOWN;
         for (final RecordQueue queue : partitionQueues.values()) {
             queue.clear();
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 7f3c08d..c182313 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -31,8 +31,8 @@ import java.util.ArrayDeque;
 
 /**
  * RecordQueue is a FIFO queue of {@link StampedRecord} (ConsumerRecord + 
timestamp). It also keeps track of the
- * partition timestamp defined as the minimum timestamp of records in its 
queue; in addition, its partition
- * timestamp is monotonically increasing such that once it is advanced, it 
will not be decremented.
+ * partition timestamp defined as the largest timestamp seen on the partition 
so far; this is passed to the
+ * timestamp extractor.
  */
 public class RecordQueue {
 
@@ -47,6 +47,7 @@ public class RecordQueue {
     private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;
 
     private StampedRecord headRecord = null;
+    private long partitionTime = RecordQueue.UNKNOWN;
 
     RecordQueue(final TopicPartition partition,
                 final SourceNode source,
@@ -136,20 +137,30 @@ public class RecordQueue {
     }
 
     /**
-     * Returns the tracked partition timestamp
+     * Returns the head record's timestamp
      *
      * @return timestamp
      */
-    public long timestamp() {
+    public long headRecordTimestamp() {
         return headRecord == null ? UNKNOWN : headRecord.timestamp;
     }
 
     /**
+     * Returns the tracked partition time
+     *
+     * @return partition time
+     */
+    long partitionTime() {
+        return partitionTime;
+    }
+
+    /**
      * Clear the fifo queue of its elements, also clear the time tracker's 
kept stamped elements
      */
     public void clear() {
         fifoQueue.clear();
         headRecord = null;
+        partitionTime = RecordQueue.UNKNOWN;
     }
 
     private void updateHead() {
@@ -164,7 +175,7 @@ public class RecordQueue {
 
             final long timestamp;
             try {
-                timestamp = timestampExtractor.extract(deserialized, 
timestamp());
+                timestamp = timestampExtractor.extract(deserialized, 
partitionTime);
             } catch (final StreamsException internalFatalExtractorException) {
                 throw internalFatalExtractorException;
             } catch (final Exception fatalUserException) {
@@ -185,6 +196,8 @@ public class RecordQueue {
             }
 
             headRecord = new StampedRecord(deserialized, timestamp);
+
+            partitionTime = Math.max(partitionTime, timestamp);
         }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 539d116..3ab2524 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -816,14 +816,14 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     public boolean maybePunctuateStreamTime() {
-        final long timestamp = partitionGroup.timestamp();
+        final long streamTime = partitionGroup.streamTime();
 
         // if the timestamp is not known yet, meaning there is not enough data 
accumulated
         // to reason stream partition time, then skip.
-        if (timestamp == RecordQueue.UNKNOWN) {
+        if (streamTime == RecordQueue.UNKNOWN) {
             return false;
         } else {
-            final boolean punctuated = 
streamTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.STREAM_TIME, 
this);
+            final boolean punctuated = 
streamTimePunctuationQueue.mayPunctuate(streamTime, 
PunctuationType.STREAM_TIME, this);
 
             if (punctuated) {
                 commitNeeded = true;
@@ -841,9 +841,9 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
     public boolean maybePunctuateSystemTime() {
-        final long timestamp = time.milliseconds();
+        final long systemTime = time.milliseconds();
 
-        final boolean punctuated = 
systemTimePunctuationQueue.mayPunctuate(timestamp, 
PunctuationType.WALL_CLOCK_TIME, this);
+        final boolean punctuated = 
systemTimePunctuationQueue.mayPunctuate(systemTime, 
PunctuationType.WALL_CLOCK_TIME, this);
 
         if (punctuated) {
             commitNeeded = true;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 724cbb5..7221ec8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -670,7 +670,7 @@ public class StreamsConfigTest {
     public static class MockTimestampExtractor implements TimestampExtractor {
 
         @Override
-        public long extract(final ConsumerRecord<Object, Object> record, final 
long previousTimestamp) {
+        public long extract(final ConsumerRecord<Object, Object> record, final 
long partitionTime) {
             return 0;
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 6b95bdf..cfc814f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -107,7 +107,7 @@ public class PartitionGroupTest {
         // st: -1 since no records was being processed yet
 
         verifyBuffered(6, 3, 3);
-        assertEquals(-1L, group.timestamp());
+        assertEquals(-1L, group.streamTime());
         assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         StampedRecord record;
@@ -143,7 +143,7 @@ public class PartitionGroupTest {
         // 2:[4, 6]
         // st: 2 (just adding records shouldn't change it)
         verifyBuffered(6, 4, 2);
-        assertEquals(2L, group.timestamp());
+        assertEquals(2L, group.streamTime());
         assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         // get one record, time should be advanced
@@ -221,7 +221,7 @@ public class PartitionGroupTest {
         group.addRawRecords(partition1, list1);
 
         verifyBuffered(3, 3, 0);
-        assertEquals(-1L, group.timestamp());
+        assertEquals(-1L, group.streamTime());
         assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
         StampedRecord record;
@@ -258,7 +258,7 @@ public class PartitionGroupTest {
 
     private void verifyTimes(final StampedRecord record, final long 
recordTime, final long streamTime) {
         assertEquals(recordTime, record.timestamp);
-        assertEquals(streamTime, group.timestamp());
+        assertEquals(streamTime, group.streamTime());
     }
 
     private void verifyBuffered(final int totalBuffered, final int 
partitionOneBuffered, final int partitionTwoBuffered) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index f6736d2..0207a7b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -731,7 +731,7 @@ public class ProcessorTopologyTest {
         private static final long DEFAULT_TIMESTAMP = 1000L;
 
         @Override
-        public long extract(final ConsumerRecord<Object, Object> record, final 
long previousTimestamp) {
+        public long extract(final ConsumerRecord<Object, Object> record, final 
long partitionTime) {
             if (record.value().toString().matches(".*@[0-9]+")) {
                 return Long.parseLong(record.value().toString().split("@")[1]);
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index c16cb2a..6dadb49 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -101,7 +101,7 @@ public class RecordQueueTest {
 
         assertTrue(queue.isEmpty());
         assertEquals(0, queue.size());
-        assertEquals(RecordQueue.UNKNOWN, queue.timestamp());
+        assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
 
         // add three 3 out-of-order records with timestamp 2, 1, 3
         final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
@@ -112,17 +112,17 @@ public class RecordQueueTest {
         queue.addRawRecords(list1);
 
         assertEquals(3, queue.size());
-        assertEquals(2L, queue.timestamp());
+        assertEquals(2L, queue.headRecordTimestamp());
 
         // poll the first record, now with 1, 3
         assertEquals(2L, queue.poll().timestamp);
         assertEquals(2, queue.size());
-        assertEquals(1L, queue.timestamp());
+        assertEquals(1L, queue.headRecordTimestamp());
 
         // poll the second record, now with 3
         assertEquals(1L, queue.poll().timestamp);
         assertEquals(1, queue.size());
-        assertEquals(3L, queue.timestamp());
+        assertEquals(3L, queue.headRecordTimestamp());
 
         // add three 3 out-of-order records with timestamp 4, 1, 2
         // now with 3, 4, 1, 2
@@ -134,24 +134,24 @@ public class RecordQueueTest {
         queue.addRawRecords(list2);
 
         assertEquals(4, queue.size());
-        assertEquals(3L, queue.timestamp());
+        assertEquals(3L, queue.headRecordTimestamp());
 
         // poll the third record, now with 4, 1, 2
         assertEquals(3L, queue.poll().timestamp);
         assertEquals(3, queue.size());
-        assertEquals(4L, queue.timestamp());
+        assertEquals(4L, queue.headRecordTimestamp());
 
         // poll the rest records
         assertEquals(4L, queue.poll().timestamp);
-        assertEquals(1L, queue.timestamp());
+        assertEquals(1L, queue.headRecordTimestamp());
 
         assertEquals(1L, queue.poll().timestamp);
-        assertEquals(2L, queue.timestamp());
+        assertEquals(2L, queue.headRecordTimestamp());
 
         assertEquals(2L, queue.poll().timestamp);
         assertTrue(queue.isEmpty());
         assertEquals(0, queue.size());
-        assertEquals(RecordQueue.UNKNOWN, queue.timestamp());
+        assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
 
         // add three more records with 4, 5, 6
         final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@@ -162,24 +162,51 @@ public class RecordQueueTest {
         queue.addRawRecords(list3);
 
         assertEquals(3, queue.size());
-        assertEquals(4L, queue.timestamp());
+        assertEquals(4L, queue.headRecordTimestamp());
 
         // poll one record again, the timestamp should advance now
         assertEquals(4L, queue.poll().timestamp);
         assertEquals(2, queue.size());
-        assertEquals(5L, queue.timestamp());
+        assertEquals(5L, queue.headRecordTimestamp());
 
         // clear the queue
         queue.clear();
         assertTrue(queue.isEmpty());
         assertEquals(0, queue.size());
-        assertEquals(RecordQueue.UNKNOWN, queue.timestamp());
+        assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
 
         // re-insert the three records with 4, 5, 6
         queue.addRawRecords(list3);
 
         assertEquals(3, queue.size());
-        assertEquals(4L, queue.timestamp());
+        assertEquals(4L, queue.headRecordTimestamp());
+    }
+
+    @Test
+    public void shouldTrackPartitionTimeAsMaxSeenTimestamp() {
+
+        assertTrue(queue.isEmpty());
+        assertEquals(0, queue.size());
+        assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
+
+        // add three 3 out-of-order records with timestamp 2, 1, 3, 4
+        final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
+            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 
0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 
0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 
0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 
0L, 0, 0, recordKey, recordValue));
+
+        assertEquals(queue.partitionTime(), RecordQueue.UNKNOWN);
+
+        queue.addRawRecords(list1);
+
+        assertEquals(queue.partitionTime(), 2L);
+
+        queue.poll();
+        assertEquals(queue.partitionTime(), 2L);
+
+        queue.poll();
+        assertEquals(queue.partitionTime(), 3L);
     }
 
     @Test(expected = StreamsException.class)
@@ -253,4 +280,57 @@ public class RecordQueueTest {
 
         assertEquals(0, queue.size());
     }
+
+    @Test
+    public void shouldPassPartitionTimeToTimestampExtractor() {
+
+        final PartitionTimeTrackingTimestampExtractor timestampExtractor = new 
PartitionTimeTrackingTimestampExtractor();
+        final RecordQueue queue = new RecordQueue(
+            new TopicPartition(topics[0], 1),
+            mockSourceNodeWithMetrics,
+            timestampExtractor,
+            new LogAndFailExceptionHandler(),
+            context,
+            new LogContext());
+
+        assertTrue(queue.isEmpty());
+        assertEquals(0, queue.size());
+        assertEquals(RecordQueue.UNKNOWN, queue.headRecordTimestamp());
+
+        // add three 3 out-of-order records with timestamp 2, 1, 3, 4
+        final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
+            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 
0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 
0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 
0L, 0, 0, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 
0L, 0, 0, recordKey, recordValue));
+
+        assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime);
+
+        queue.addRawRecords(list1);
+
+        // no (known) timestamp has yet been passed to the timestamp extractor
+        assertEquals(RecordQueue.UNKNOWN, timestampExtractor.partitionTime);
+
+        queue.poll();
+        assertEquals(2L, timestampExtractor.partitionTime);
+
+        queue.poll();
+        assertEquals(2L, timestampExtractor.partitionTime);
+
+        queue.poll();
+        assertEquals(3L, timestampExtractor.partitionTime);
+
+    }
+
+    class PartitionTimeTrackingTimestampExtractor implements 
TimestampExtractor {
+        private long partitionTime = RecordQueue.UNKNOWN;
+
+        public long extract(final ConsumerRecord<Object, Object> record, final 
long partitionTime) {
+            if (partitionTime < this.partitionTime) {
+                throw new IllegalStateException("Partition time should not 
decrease");
+            }
+            this.partitionTime = partitionTime;
+            return record.offset();
+        }
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java 
b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
index 1701164..f437772 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 public class MockTimestampExtractor implements TimestampExtractor {
 
     @Override
-    public long extract(final ConsumerRecord<Object, Object> record, final 
long previousTimestamp) {
+    public long extract(final ConsumerRecord<Object, Object> record, final 
long partitionTime) {
         return record.offset();
     }
 }

Reply via email to