bbejeck commented on code in PR #19937:
URL: https://github.com/apache/kafka/pull/19937#discussion_r2503348748


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java:
##########
@@ -93,7 +94,15 @@ public <S extends StateStore> S getStateStore(final String 
name) {
     public Cancellable schedule(final Duration interval,
                                 final PunctuationType type,
                                 final Punctuator callback) throws 
IllegalArgumentException {
-        return delegate.schedule(interval, type, callback);
+        return delegate.schedule(null, interval, type, callback);

Review Comment:
   Instead of `null` maybe `Instant.EPOCH` or `Optional<Instant>` 
   



##########
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java:
##########
@@ -153,6 +154,46 @@ Cancellable schedule(final Duration interval,
                          final PunctuationType type,
                          final Punctuator callback);
 
+    /**
+     * Schedule a periodic operation for processors. A processor may call this 
method during
+     * {@link Processor#init(ProcessorContext) initialization},
+     * {@link Processor#process(Record) processing},
+     * {@link FixedKeyProcessor#init(FixedKeyProcessorContext) 
initialization}, or
+     * {@link FixedKeyProcessor#process(FixedKeyRecord) processing} to
+     * schedule a periodic callback &mdash; called a punctuation &mdash; to 
{@link Punctuator#punctuate(long)}.
+     * The type parameter controls what notion of time is used for punctuation:
+     * <ul>
+     *   <li>{@link PunctuationType#STREAM_TIME} &mdash; uses "stream time", 
which is advanced by the processing of messages
+     *   in accordance with the timestamp as extracted by the {@link 
TimestampExtractor} in use.
+     *   The first punctuation will be triggered by the first record that is 
processed.
+     *   <b>NOTE:</b> Only advanced if messages arrive</li>

Review Comment:
   nit 
   
   ```suggestion
        *   <b>NOTE:</b> Only advances as messages arrive</li>
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1173,6 +1189,15 @@ private Cancellable schedule(final long startTime, final 
long interval, final Pu
         }
     }
 
+    // For anchored schedule, we want to have all punctuations to happen only 
on times based on combinations of startTime and interval

Review Comment:
   nit
   
   ```suggestion
       // For anchored schedule, we want to have all punctuations only fire on 
times based on combinations of startTime and interval
   ```



##########
streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java:
##########
@@ -46,14 +47,22 @@ public class MockApiProcessor<KIn, VIn, KOut, VOut> 
implements Processor<KIn, VI
     private Cancellable scheduleCancellable;
 
     private final PunctuationType punctuationType;
+    private final Instant startTime;
     private final long scheduleInterval;
 
     private boolean commitRequested = false;
     private ProcessorContext<KOut, VOut> context;
 
     public MockApiProcessor(final PunctuationType punctuationType,
                             final long scheduleInterval) {
+        this(punctuationType, null, scheduleInterval);
+    }
+
+    public MockApiProcessor(final PunctuationType punctuationType,
+                            final Instant startTime,

Review Comment:
   Same consideration here to make the `startTime` an `Optional` or use 
`Instant.EPOCH_TIME` as a sentinel value when calling as line 58 above



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java:
##########
@@ -130,6 +131,11 @@ public long currentStreamTimeMs() {
      */
     @Override
     public Cancellable schedule(final Duration interval, final PunctuationType 
type, final Punctuator callback) {
+        return schedule(null, interval, type, callback);

Review Comment:
   same here



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1133,16 +1134,31 @@ public void updateNextOffsets(final TopicPartition 
partition, final OffsetAndMet
      * @throws IllegalStateException if the current node is not null
      */
     public Cancellable schedule(final long interval, final PunctuationType 
type, final Punctuator punctuator) {
-        switch (type) {
-            case STREAM_TIME:
-                // align punctuation to 0L, punctuate as soon as we have data
-                return schedule(0L, interval, type, punctuator);
-            case WALL_CLOCK_TIME:
-                // align punctuation to now, punctuate after interval has 
elapsed
-                return schedule(time.milliseconds() + interval, interval, 
type, punctuator);
-            default:
-                throw new IllegalArgumentException("Unrecognized 
PunctuationType: " + type);
+        return schedule(null, interval, type, punctuator);

Review Comment:
   same



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java:
##########
@@ -307,13 +308,22 @@ public void commit() {
     public Cancellable schedule(final Duration interval,
                                 final PunctuationType type,
                                 final Punctuator callback) throws 
IllegalArgumentException {
+        return schedule(null, interval, type, callback);

Review Comment:
   same



##########
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java:
##########
@@ -150,6 +151,46 @@ Cancellable schedule(final Duration interval,
                          final PunctuationType type,
                          final Punctuator callback);
 
+    /**
+     * Schedule a periodic operation for processors. A processor may call this 
method during a
+     * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}'s
+     * {@link 
org.apache.kafka.streams.kstream.ValueTransformerWithKey#init(ProcessorContext) 
initialization} or
+     * {@link 
org.apache.kafka.streams.kstream.ValueTransformerWithKey#transform(Object, 
Object) processing} to
+     * schedule a periodic callback &mdash; called a punctuation &mdash; to 
{@link Punctuator#punctuate(long)}.
+     * The type parameter controls what notion of time is used for punctuation:
+     * <ul>
+     *   <li>{@link PunctuationType#STREAM_TIME} &mdash; uses "stream time", 
which is advanced by the processing of messages
+     *   in accordance with the timestamp as extracted by the {@link 
TimestampExtractor} in use.
+     *   The first punctuation will be triggered by the first record that is 
processed.
+     *   <b>NOTE:</b> Only advanced if messages arrive</li>

Review Comment:
   Same changes for here and line 167 from above



##########
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java:
##########
@@ -153,6 +154,46 @@ Cancellable schedule(final Duration interval,
                          final PunctuationType type,
                          final Punctuator callback);
 
+    /**
+     * Schedule a periodic operation for processors. A processor may call this 
method during
+     * {@link Processor#init(ProcessorContext) initialization},
+     * {@link Processor#process(Record) processing},
+     * {@link FixedKeyProcessor#init(FixedKeyProcessorContext) 
initialization}, or
+     * {@link FixedKeyProcessor#process(FixedKeyRecord) processing} to
+     * schedule a periodic callback &mdash; called a punctuation &mdash; to 
{@link Punctuator#punctuate(long)}.
+     * The type parameter controls what notion of time is used for punctuation:
+     * <ul>
+     *   <li>{@link PunctuationType#STREAM_TIME} &mdash; uses "stream time", 
which is advanced by the processing of messages
+     *   in accordance with the timestamp as extracted by the {@link 
TimestampExtractor} in use.
+     *   The first punctuation will be triggered by the first record that is 
processed.
+     *   <b>NOTE:</b> Only advanced if messages arrive</li>
+     *   <li>{@link PunctuationType#WALL_CLOCK_TIME} &mdash; uses system time 
(the wall-clock time),
+     *   which is advanced independent of whether new messages arrive.

Review Comment:
   nit
   
   ```suggestion
        *   which advances independent of whether new messages arrive.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to