lihaosky commented on code in PR #11896:
URL: https://github.com/apache/kafka/pull/11896#discussion_r847582057


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java:
##########
@@ -232,11 +247,19 @@
                     );
                     break;
                 case ROCKS_DB:
-                    supplier = Stores.persistentTimestampedWindowStore(
-                        materialized.storeName(),
-                        Duration.ofMillis(retentionPeriod),
-                        Duration.ofMillis(windows.size()),
-                        false
+                    supplier = emitStrategy.type() == 
StrategyType.ON_WINDOW_CLOSE ?
+                        
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
+                            materialized.storeName(),

Review Comment:
   If a user just enables `emit final` for their existing topology which uses 
`emit eager`, will it cause it to use existing state store which has wrong data 
format etc?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
+import 
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy;
+import 
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy;
+
+/**
+ * This interface controls the strategy that can be used to control how we 
emit results in a processor.
+ */
+public interface EmitStrategy {
+
+    enum StrategyType {
+        ON_WINDOW_CLOSE,
+        ON_WINDOW_UPDATE
+    }
+
+    /**
+     * Returns the strategy type
+     * @return Emit strategy type
+     */
+    StrategyType type();
+
+    /**
+     * This strategy indicates that the aggregated result for a window will 
only be outputted when the
+     * window closes instead of when there's an update to the window.

Review Comment:
   Sure. I feel this is not related to caching.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/EmitStrategy.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
+import 
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy;
+import 
org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy;
+
+/**
+ * This interface controls the strategy that can be used to control how we 
emit results in a processor.
+ */
+public interface EmitStrategy {
+
+    enum StrategyType {
+        ON_WINDOW_CLOSE,
+        ON_WINDOW_UPDATE
+    }
+
+    /**
+     * Returns the strategy type
+     * @return Emit strategy type
+     */
+    StrategyType type();
+
+    /**
+     * This strategy indicates that the aggregated result for a window will 
only be outputted when the
+     * window closes instead of when there's an update to the window.
+     *
+     * <p>This strategy should only be used for window which can close. For 
example, it doesn't make sense
+     * to be used with {@link UnlimitedWindow}

Review Comment:
   Will update with details.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -80,22 +109,54 @@ public void enableSendingOldValues() {
         private TimestampedWindowStore<KIn, VAgg> windowStore;
         private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
         private Sensor droppedRecordsSensor;
+        private Sensor emittedRecordsSensor;
+        private Sensor emitFinalLatencySensor;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private long lastEmitCloseTime = ConsumerRecord.NO_TIMESTAMP;
+        private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> 
internalProcessorContext;
+        private final TimeTracker timeTracker = new TimeTracker();
+        private final Time time = Time.SYSTEM;
 
         @Override
         public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> 
context) {
             super.init(context);
-            final InternalProcessorContext<Windowed<KIn>, Change<VAgg>> 
internalProcessorContext =
-                (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) 
context;
+            internalProcessorContext = 
(InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context;
             final StreamsMetricsImpl metrics = 
internalProcessorContext.metrics();
             final String threadId = Thread.currentThread().getName();
             droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(), metrics);
             windowStore = context.getStateStore(storeName);
-            tupleForwarder = new TimestampedTupleForwarder<>(
-                windowStore,
-                context,
-                new TimestampedCacheFlushListener<>(context),
-                sendOldValues);
+
+            if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+                // Don't set flush lister which emit cache results
+                tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    sendOldValues);
+            } else {
+                tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+            }
+
+            log.info("EmitStrategy=" + emitStrategy.type());
+            // Restore last emit close time for ON_WINDOW_CLOSE strategy
+            if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+                final Long lastEmitTime = 
internalProcessorContext.processorMetadataForKey(storeName);
+                if (lastEmitTime != null) {
+                    lastEmitCloseTime = lastEmitTime;
+                }
+                final long emitInterval = StreamsConfig.InternalConfig.getLong(
+                    context.appConfigs(),
+                    EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
+                    1000L
+                );
+                timeTracker.setEmitInterval(emitInterval);
+                log.info("EmitInterval=" + emitInterval);

Review Comment:
   Similar to previous comment, I feel this won't be called many time (just 
once when creating the processor?). And the config won't be printed out when we 
print all configs since this is an internal config. This will be convenient for 
debugging.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java:
##########
@@ -202,12 +207,22 @@
         return aggregateBuilder.build(
                 new NamedInternal(reduceName),
                 materialize(materializedInternal),
-                new KStreamWindowAggregate<>(windows, 
materializedInternal.storeName(), aggregateBuilder.reduceInitializer, 
aggregatorForReducer(reducer)),
+                new KStreamWindowAggregate<>(windows, 
materializedInternal.storeName(), emitStrategy, 
aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
                 materializedInternal.queryableStoreName(),
                 materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
                 materializedInternal.valueSerde());
     }
 
+    //@Override

Review Comment:
   This method is not added to the interface `TimeWindowedKStream` yet since 
`SlidingWindowedKStreamImpl` also implements the interface. I plan to add it to 
interface together with `SlidingWindowedKStreamImpl` implementation.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -80,22 +109,54 @@ public void enableSendingOldValues() {
         private TimestampedWindowStore<KIn, VAgg> windowStore;
         private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
         private Sensor droppedRecordsSensor;
+        private Sensor emittedRecordsSensor;
+        private Sensor emitFinalLatencySensor;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private long lastEmitCloseTime = ConsumerRecord.NO_TIMESTAMP;
+        private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> 
internalProcessorContext;
+        private final TimeTracker timeTracker = new TimeTracker();
+        private final Time time = Time.SYSTEM;
 
         @Override
         public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> 
context) {
             super.init(context);
-            final InternalProcessorContext<Windowed<KIn>, Change<VAgg>> 
internalProcessorContext =
-                (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) 
context;
+            internalProcessorContext = 
(InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context;
             final StreamsMetricsImpl metrics = 
internalProcessorContext.metrics();
             final String threadId = Thread.currentThread().getName();
             droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(), metrics);
             windowStore = context.getStateStore(storeName);
-            tupleForwarder = new TimestampedTupleForwarder<>(
-                windowStore,
-                context,
-                new TimestampedCacheFlushListener<>(context),
-                sendOldValues);
+
+            if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
+                // Don't set flush lister which emit cache results
+                tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    sendOldValues);
+            } else {
+                tupleForwarder = new TimestampedTupleForwarder<>(
+                    windowStore,
+                    context,
+                    new TimestampedCacheFlushListener<>(context),
+                    sendOldValues);
+            }
+
+            log.info("EmitStrategy=" + emitStrategy.type());

Review Comment:
   I guess this won't be called many times and flood the log while this is 
helpful information. Will this be printed somewhere else (i.e. topology)?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -80,22 +109,54 @@ public void enableSendingOldValues() {
         private TimestampedWindowStore<KIn, VAgg> windowStore;
         private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
         private Sensor droppedRecordsSensor;
+        private Sensor emittedRecordsSensor;
+        private Sensor emitFinalLatencySensor;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private long lastEmitCloseTime = ConsumerRecord.NO_TIMESTAMP;
+        private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> 
internalProcessorContext;
+        private final TimeTracker timeTracker = new TimeTracker();
+        private final Time time = Time.SYSTEM;
 
         @Override
         public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> 
context) {
             super.init(context);
-            final InternalProcessorContext<Windowed<KIn>, Change<VAgg>> 
internalProcessorContext =
-                (InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) 
context;
+            internalProcessorContext = 
(InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context;
             final StreamsMetricsImpl metrics = 
internalProcessorContext.metrics();
             final String threadId = Thread.currentThread().getName();
             droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(), metrics);

Review Comment:
   I'm ok removing this if the process involves another round of discussion and 
voting 😃 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) {
                     droppedRecordsSensor.record();
                 }
             }
+
+            maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), 
time, emitFinalLatencySensor);
+        }
+
+        private void tryEmitFinalResult(final Record<KIn, VIn> record, final 
long closeTime) {
+            if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+                return;
+            }
+
+            final long now = internalProcessorContext.currentSystemTimeMs();
+            // Throttle emit frequency
+            if (now < timeTracker.nextTimeToEmit) {
+                return;
+            }
+
+            // Schedule next emit time based on now to avoid the case that if 
system time jumps a lot,
+            // this can be triggered everytime
+            timeTracker.nextTimeToEmit = now;
+            timeTracker.advanceNextTimeToEmit();
+
+            // Close time does not progress
+            if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && 
lastEmitCloseTime >= closeTime) {
+                return;
+            }
+
+            final long emitWindowStart = closeTime - windows.size();
+            if (emitWindowStart < 0) {
+                // If emitWindowStart is 0, it means first window closes since 
windowEndTime
+                // is exclusive
+                return;
+            }
+
+            // Because we only get here when emitWindowStart > 0 which means 
closeTime > windows.size()
+            // Since we set lastEmitCloseTime to closeTime before storing to 
processor metadata
+            // lastEmitCloseTime - windows.size() is always > 0

Review Comment:
   Trying to say `lastEmitCloseTime - windows.size()` won't be less than or 
equal to 0 and the reason



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) {
                     droppedRecordsSensor.record();
                 }
             }
+
+            maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), 
time, emitFinalLatencySensor);
+        }
+
+        private void tryEmitFinalResult(final Record<KIn, VIn> record, final 
long closeTime) {
+            if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+                return;
+            }
+
+            final long now = internalProcessorContext.currentSystemTimeMs();
+            // Throttle emit frequency
+            if (now < timeTracker.nextTimeToEmit) {
+                return;
+            }
+
+            // Schedule next emit time based on now to avoid the case that if 
system time jumps a lot,
+            // this can be triggered everytime
+            timeTracker.nextTimeToEmit = now;
+            timeTracker.advanceNextTimeToEmit();
+
+            // Close time does not progress
+            if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && 
lastEmitCloseTime >= closeTime) {

Review Comment:
   If we have multiple source topics in a task and `lastEmitCloseTime` is 
committed successfully for some sources and commit offset is not committed 
successfully for some sources, is it possible we restored `lastEmitCloseTime` 
and start processing from earlier time? In this case, `lastEmitCloseTime` can 
be larger than `closeTime`? Maybe log warning if `lastEmitCloseTime` is larger?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) {
                     droppedRecordsSensor.record();
                 }
             }
+
+            maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), 
time, emitFinalLatencySensor);
+        }
+
+        private void tryEmitFinalResult(final Record<KIn, VIn> record, final 
long closeTime) {
+            if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+                return;
+            }
+
+            final long now = internalProcessorContext.currentSystemTimeMs();
+            // Throttle emit frequency
+            if (now < timeTracker.nextTimeToEmit) {
+                return;
+            }
+
+            // Schedule next emit time based on now to avoid the case that if 
system time jumps a lot,
+            // this can be triggered everytime
+            timeTracker.nextTimeToEmit = now;
+            timeTracker.advanceNextTimeToEmit();
+
+            // Close time does not progress
+            if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && 
lastEmitCloseTime >= closeTime) {
+                return;
+            }
+
+            final long emitWindowStart = closeTime - windows.size();
+            if (emitWindowStart < 0) {
+                // If emitWindowStart is 0, it means first window closes since 
windowEndTime
+                // is exclusive
+                return;
+            }
+
+            // Because we only get here when emitWindowStart > 0 which means 
closeTime > windows.size()
+            // Since we set lastEmitCloseTime to closeTime before storing to 
processor metadata
+            // lastEmitCloseTime - windows.size() is always > 0
+            // Set lastEmitWindowStart to -1L if not set so that when we 
fetchAll, we fetch from 0L
+            final long lastEmitWindowStart = lastEmitCloseTime == 
ConsumerRecord.NO_TIMESTAMP ?
+                -1L : lastEmitCloseTime - windows.size();
+
+            if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP) {
+                final Map<Long, W> matchedCloseWindows = 
windows.windowsFor(emitWindowStart);
+                final Map<Long, W> matchedEmitWindows = 
windows.windowsFor(lastEmitWindowStart);
+
+                // Don't fetch store if the new emit window close time doesn't 
progress enough to cover next
+                // window
+                if (matchedCloseWindows.equals(matchedEmitWindows)) {
+                    log.debug("no new windows to emit. LastEmitCloseTime={}, 
newCloseTime={}",
+                        lastEmitCloseTime, closeTime);
+                    return;
+                }
+            }
+
+            final KeyValueIterator<Windowed<KIn>, ValueAndTimestamp<VAgg>> 
windowToEmit =  windowStore
+                .fetchAll(lastEmitWindowStart + 1, emitWindowStart);
+
+            int emittedCount = 0;
+            while (windowToEmit.hasNext()) {
+                emittedCount++;
+                final KeyValue<Windowed<KIn>, ValueAndTimestamp<VAgg>> kv = 
windowToEmit.next();
+                tupleForwarder.maybeForward(
+                    record.withKey(kv.key)
+                        .withValue(new Change<>(kv.value.value(), null))
+                        .withTimestamp(kv.value.timestamp())
+                        .withHeaders(null)); // Don't set header
+            }
+            emittedRecordsSensor.record(emittedCount);
+
+            lastEmitCloseTime = closeTime;
+            internalProcessorContext.addProcessorMetadataKeyValue(storeName, 
closeTime);

Review Comment:
   I thought about this and thought it might be more consistent if we shift 
both `lastEmitCloseTime` and `closeTime`. If we just store `lastEmitStartTime`, 
we only shift `closeTime` when processing. Also I feel it's easier to store 
`closeTime` as it's easier to see if a window closes without shifting it and 
see.



##########
streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java:
##########
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThrows;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class TimeWindowedKStreamIntegrationTest {
+    private static final int NUM_BROKERS = 1;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS,
+        mkProperties(
+            mkMap(mkEntry("log.retention.hours", "-1"), 
mkEntry("log.retention.bytes", "-1")) // Don't expire records since we 
manipulate timestamp
+        )
+    );
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+
+    private StreamsBuilder builder;
+    private Properties streamsConfiguration;
+    private KafkaStreams kafkaStreams;
+    private String streamOneInput;
+    private String streamTwoInput;
+    private String outputTopic;
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Parameter
+    public StrategyType type;
+
+    @Parameter(1)
+    public EmitStrategy emitStrategy;
+
+    private boolean emitFinal;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> getEmitStrategy() {
+        return asList(new Object[][] {
+            {StrategyType.ON_WINDOW_UPDATE, EmitStrategy.onWindowUpdate()},
+            {StrategyType.ON_WINDOW_CLOSE, EmitStrategy.onWindowClose()}
+        });
+    }
+
+    @Before
+    public void before() throws InterruptedException {
+        builder = new StreamsBuilder();
+        createTopics();
+        streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
100L);
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        
streamsConfiguration.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
 0); // Always process
+        
streamsConfiguration.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
 Long.MAX_VALUE); // Don't expire changelog
+
+        emitFinal = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE;
+    }
+
+    @After
+    public void whenShuttingDown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+            kafkaStreams.cleanUp();
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @Test
+    public void shouldAggregateWindowedWithNoGrace() throws Exception {
+        produceMessages(
+            streamOneInput,
+            new KeyValueTimestamp<>("A", "1", 0),
+            new KeyValueTimestamp<>("A", "1", 5),
+            new KeyValueTimestamp<>("A", "1", 10), // close [0, 10)
+            new KeyValueTimestamp<>("B", "2", 6),  // late and skip
+            new KeyValueTimestamp<>("B", "2", 11), // close [0, 10)

Review Comment:
   Yeah. It's closed. Will update comment



##########
streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java:
##########
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThrows;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class TimeWindowedKStreamIntegrationTest {
+    private static final int NUM_BROKERS = 1;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS,

Review Comment:
   I saw `EmbeddedKafkaCluster` is always used for Integration testing and 
`TopologyTestDriver` is used for unit test?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) {
                     droppedRecordsSensor.record();
                 }
             }
+
+            maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), 
time, emitFinalLatencySensor);
+        }
+
+        private void tryEmitFinalResult(final Record<KIn, VIn> record, final 
long closeTime) {
+            if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+                return;
+            }
+
+            final long now = internalProcessorContext.currentSystemTimeMs();
+            // Throttle emit frequency
+            if (now < timeTracker.nextTimeToEmit) {
+                return;
+            }
+
+            // Schedule next emit time based on now to avoid the case that if 
system time jumps a lot,
+            // this can be triggered everytime
+            timeTracker.nextTimeToEmit = now;
+            timeTracker.advanceNextTimeToEmit();
+
+            // Close time does not progress
+            if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && 
lastEmitCloseTime >= closeTime) {
+                return;
+            }
+
+            final long emitWindowStart = closeTime - windows.size();

Review Comment:
   Yeah. Will rename. Naming is hard...



##########
streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java:
##########
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThrows;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class TimeWindowedKStreamIntegrationTest {
+    private static final int NUM_BROKERS = 1;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS,
+        mkProperties(
+            mkMap(mkEntry("log.retention.hours", "-1"), 
mkEntry("log.retention.bytes", "-1")) // Don't expire records since we 
manipulate timestamp
+        )
+    );
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+
+    private StreamsBuilder builder;
+    private Properties streamsConfiguration;
+    private KafkaStreams kafkaStreams;
+    private String streamOneInput;
+    private String streamTwoInput;
+    private String outputTopic;
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Parameter
+    public StrategyType type;
+
+    @Parameter(1)
+    public EmitStrategy emitStrategy;
+
+    private boolean emitFinal;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> getEmitStrategy() {
+        return asList(new Object[][] {
+            {StrategyType.ON_WINDOW_UPDATE, EmitStrategy.onWindowUpdate()},
+            {StrategyType.ON_WINDOW_CLOSE, EmitStrategy.onWindowClose()}
+        });
+    }
+
+    @Before
+    public void before() throws InterruptedException {
+        builder = new StreamsBuilder();
+        createTopics();
+        streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
100L);
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        
streamsConfiguration.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
 0); // Always process
+        
streamsConfiguration.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
 Long.MAX_VALUE); // Don't expire changelog
+
+        emitFinal = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE;
+    }
+
+    @After
+    public void whenShuttingDown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+            kafkaStreams.cleanUp();
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @Test
+    public void shouldAggregateWindowedWithNoGrace() throws Exception {
+        produceMessages(
+            streamOneInput,
+            new KeyValueTimestamp<>("A", "1", 0),
+            new KeyValueTimestamp<>("A", "1", 5),
+            new KeyValueTimestamp<>("A", "1", 10), // close [0, 10)
+            new KeyValueTimestamp<>("B", "2", 6),  // late and skip

Review Comment:
   Yeah. It does go to second window. Will update the comment



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java:
##########
@@ -158,21 +235,23 @@ public void shouldMaterializeCount() {
                 final List<KeyValue<Windowed<String>, Long>> data =
                     StreamsTestUtils.toList(windowStore.fetch("1", "2", 
ofEpochMilli(0), ofEpochMilli(1000L)));
 
-                assertThat(data, equalTo(Arrays.asList(
+                assertThat(data, equalTo(asList(
                     KeyValue.pair(new Windowed<>("1", new TimeWindow(0, 500)), 
2L),
                     KeyValue.pair(new Windowed<>("1", new TimeWindow(500, 
1000)), 1L),
-                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 
1000)), 2L))));
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(500, 
1000)), 2L),
+                    KeyValue.pair(new Windowed<>("2", new TimeWindow(1000, 
1500)), 1L))));

Review Comment:
   I added one more input in `processData`: `inputTopic.pipeInput("2", "30", 
1000L);`



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) {
                     droppedRecordsSensor.record();
                 }
             }
+
+            maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), 
time, emitFinalLatencySensor);
+        }
+
+        private void tryEmitFinalResult(final Record<KIn, VIn> record, final 
long closeTime) {
+            if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+                return;
+            }
+
+            final long now = internalProcessorContext.currentSystemTimeMs();
+            // Throttle emit frequency
+            if (now < timeTracker.nextTimeToEmit) {
+                return;
+            }
+
+            // Schedule next emit time based on now to avoid the case that if 
system time jumps a lot,
+            // this can be triggered everytime
+            timeTracker.nextTimeToEmit = now;

Review Comment:
   stream-stream join didn't do this. Created 
https://issues.apache.org/jira/browse/KAFKA-13817 to track



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java:
##########
@@ -42,84 +47,142 @@
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
 
 import static java.time.Duration.ofMillis;
 import static java.time.Instant.ofEpochMilli;
+import static java.util.Arrays.asList;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 
+@RunWith(Parameterized.class)
 public class TimeWindowedKStreamImplTest {
     private static final String TOPIC = "input";
+    private static final Windowed<String> KEY_1_WINDOW_0 = new Windowed<>("1", 
new TimeWindow(0L, 500L));
+    private static final Windowed<String> KEY_1_WINDOW_1 = new Windowed<>("1", 
new TimeWindow(500L, 1000L));
+    private static final Windowed<String> KEY_2_WINDOW_1 = new Windowed<>("2", 
new TimeWindow(500L, 1000L));
+    private static final Windowed<String> KEY_2_WINDOW_2 = new Windowed<>("2", 
new TimeWindow(1000L, 1500L));
+
     private final StreamsBuilder builder = new StreamsBuilder();
     private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
-    private TimeWindowedKStream<String, String> windowedStream;
+    private TimeWindowedKStreamImpl<String, String, TimeWindow> windowedStream;
+
+    @Parameter
+    public StrategyType type;
+
+    @Parameter(1)
+    public EmitStrategy emitStrategy;
+
+    private boolean emitFinal;
 
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> getKeySchema() {
+        return asList(new Object[][] {
+            {StrategyType.ON_WINDOW_UPDATE, EmitStrategy.onWindowUpdate()},
+            {StrategyType.ON_WINDOW_CLOSE, EmitStrategy.onWindowClose()}
+        });
+    }
+
+    @SuppressWarnings("unchecked")
     @Before
     public void before() {
+        emitFinal = type.equals(StrategyType.ON_WINDOW_CLOSE);
+        // Set interval to 0 so that it always tries to emit
+        
props.setProperty(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
 "0");
         final KStream<String, String> stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
-        windowedStream = stream.
+        // TODO: remove this cast 
https://issues.apache.org/jira/browse/KAFKA-13800
+        windowedStream = (TimeWindowedKStreamImpl<String, String, TimeWindow>) 
(stream.
             groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)));
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L))));
     }
 
     @Test
     public void shouldCountWindowed() {
         final MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
         windowedStream
+            .emitStrategy(emitStrategy)
             .count()
             .toStream()
             .process(supplier);
 
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
             processData(driver);
         }
-        assertThat(
-            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
-                .get(new Windowed<>("1", new TimeWindow(0L, 500L))),
-            equalTo(ValueAndTimestamp.make(2L, 15L)));
-        assertThat(
-            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
-                .get(new Windowed<>("2", new TimeWindow(500L, 1000L))),
-            equalTo(ValueAndTimestamp.make(2L, 550L)));
-        assertThat(
-            supplier.theCapturedProcessor().lastValueAndTimestampPerKey()
-                .get(new Windowed<>("1", new TimeWindow(500L, 1000L))),
-            equalTo(ValueAndTimestamp.make(1L, 500L)));
+        final ArrayList<KeyValueTimestamp<Windowed<String>, Long>> processed = 
supplier.theCapturedProcessor().processed();
+
+        if (emitFinal) {
+            assertEquals(
+                asList(
+                    new KeyValueTimestamp<>(KEY_1_WINDOW_0, 2L, 15L),
+                    new KeyValueTimestamp<>(KEY_1_WINDOW_1, 1L, 500L),
+                    new KeyValueTimestamp<>(KEY_2_WINDOW_1, 2L, 550L)

Review Comment:
   Didn't get your question. The fetch for emit final is based on `windowStart` 
order.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##########
@@ -184,6 +247,75 @@ public void process(final Record<KIn, VIn> record) {
                     droppedRecordsSensor.record();
                 }
             }
+
+            maybeMeasureLatency(() -> tryEmitFinalResult(record, closeTime), 
time, emitFinalLatencySensor);
+        }
+
+        private void tryEmitFinalResult(final Record<KIn, VIn> record, final 
long closeTime) {
+            if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+                return;
+            }
+
+            final long now = internalProcessorContext.currentSystemTimeMs();
+            // Throttle emit frequency
+            if (now < timeTracker.nextTimeToEmit) {
+                return;
+            }
+
+            // Schedule next emit time based on now to avoid the case that if 
system time jumps a lot,
+            // this can be triggered everytime
+            timeTracker.nextTimeToEmit = now;
+            timeTracker.advanceNextTimeToEmit();
+
+            // Close time does not progress
+            if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && 
lastEmitCloseTime >= closeTime) {
+                return;
+            }
+
+            final long emitWindowStart = closeTime - windows.size();
+            if (emitWindowStart < 0) {
+                // If emitWindowStart is 0, it means first window closes since 
windowEndTime
+                // is exclusive
+                return;
+            }
+
+            // Because we only get here when emitWindowStart > 0 which means 
closeTime > windows.size()
+            // Since we set lastEmitCloseTime to closeTime before storing to 
processor metadata
+            // lastEmitCloseTime - windows.size() is always > 0
+            // Set lastEmitWindowStart to -1L if not set so that when we 
fetchAll, we fetch from 0L
+            final long lastEmitWindowStart = lastEmitCloseTime == 
ConsumerRecord.NO_TIMESTAMP ?
+                -1L : lastEmitCloseTime - windows.size();
+
+            if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP) {
+                final Map<Long, W> matchedCloseWindows = 
windows.windowsFor(emitWindowStart);
+                final Map<Long, W> matchedEmitWindows = 
windows.windowsFor(lastEmitWindowStart);
+
+                // Don't fetch store if the new emit window close time doesn't 
progress enough to cover next
+                // window
+                if (matchedCloseWindows.equals(matchedEmitWindows)) {

Review Comment:
   > Is this the same as emitWindowStart == lastEmitWindowStart
   
   Nope. They may not be the same. `emitWindowStart` is based on current 
`closeTime` and `lastEmitWindowStart` is based last emitted close time. These 
are not the same. The logic here to detect if `closeTime` doesn't progress much 
to emit new windows.
   
   For example, for tumbling window length 10sec and close time 11, window [0, 
10) will be emitted, if close time progress to 12, window to emit is still [0, 
10).
   
   



##########
streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java:
##########
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.util.Collection;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.WindowedSerdes;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofMillis;
+import static java.time.Instant.ofEpochMilli;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThrows;
+
+@SuppressWarnings({"unchecked"})
+@Category({IntegrationTest.class})
+@RunWith(Parameterized.class)
+public class TimeWindowedKStreamIntegrationTest {
+    private static final int NUM_BROKERS = 1;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS,
+        mkProperties(
+            mkMap(mkEntry("log.retention.hours", "-1"), 
mkEntry("log.retention.bytes", "-1")) // Don't expire records since we 
manipulate timestamp
+        )
+    );
+
+    @BeforeClass
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterClass
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+
+    private StreamsBuilder builder;
+    private Properties streamsConfiguration;
+    private KafkaStreams kafkaStreams;
+    private String streamOneInput;
+    private String streamTwoInput;
+    private String outputTopic;
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Parameter
+    public StrategyType type;
+
+    @Parameter(1)
+    public EmitStrategy emitStrategy;
+
+    private boolean emitFinal;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> getEmitStrategy() {
+        return asList(new Object[][] {
+            {StrategyType.ON_WINDOW_UPDATE, EmitStrategy.onWindowUpdate()},
+            {StrategyType.ON_WINDOW_CLOSE, EmitStrategy.onWindowClose()}
+        });
+    }
+
+    @Before
+    public void before() throws InterruptedException {
+        builder = new StreamsBuilder();
+        createTopics();
+        streamsConfiguration = new Properties();
+        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
100L);
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        
streamsConfiguration.put(InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
 0); // Always process
+        
streamsConfiguration.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
 Long.MAX_VALUE); // Don't expire changelog
+
+        emitFinal = emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE;
+    }
+
+    @After
+    public void whenShuttingDown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+            kafkaStreams.cleanUp();
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @Test
+    public void shouldAggregateWindowedWithNoGrace() throws Exception {
+        produceMessages(
+            streamOneInput,
+            new KeyValueTimestamp<>("A", "1", 0),
+            new KeyValueTimestamp<>("A", "1", 5),
+            new KeyValueTimestamp<>("A", "1", 10), // close [0, 10)
+            new KeyValueTimestamp<>("B", "2", 6),  // late and skip
+            new KeyValueTimestamp<>("B", "2", 11), // close [0, 10)
+            new KeyValueTimestamp<>("B", "2", 15), // close [5, 15)
+            new KeyValueTimestamp<>("C", "3", 25)  // close [10, 20), [15, 25)
+        );
+
+        final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class, 10L);
+        // TODO: remove this cast 
https://issues.apache.org/jira/browse/KAFKA-13800
+        final TimeWindowedKStreamImpl<String, String, TimeWindow> 
windowedStream = (TimeWindowedKStreamImpl<String, String, TimeWindow>) builder
+            .stream(streamOneInput, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(10L)).advanceBy(ofMillis(5L)));
+        windowedStream.emitStrategy(emitStrategy)
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                Materialized.with(null, new StringSerde())
+            )
+            .toStream()
+            .to(outputTopic, Produced.with(windowedSerde, new StringSerde()));
+
+        startStreams();
+
+        final List<KeyValueTimestamp<Windowed<String>, String>> 
windowedMessages = receiveMessagesWithTimestamp(
+            new TimeWindowedDeserializer<>(new StringDeserializer(), 10L),
+            new StringDeserializer(),
+            10L,
+            String.class,
+            emitFinal ? 6 : 12);
+
+        final List<KeyValueTimestamp<Windowed<String>, String>> expectResult;
+        if (emitFinal) {
+            expectResult = asList(
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 
10L)), "0+1+1", 5),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 
15L)), "0+1+1", 10),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 
15L)), "0+2+2", 11),
+                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(10L, 20L)), "0+1", 10),
+                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(10L, 20L)), "0+2+2", 15),
+                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(15L, 25L)), "0+2", 15)
+            );
+        } else {
+            expectResult = asList(
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 
10L)), "0+1", 0),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(0L, 
10L)), "0+1+1", 5),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 
15L)), "0+1", 5),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(5L, 
15L)), "0+1+1", 10),
+                new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(10L, 20L)), "0+1", 10),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 
15L)), "0+2", 6),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(5L, 
15L)), "0+2+2", 11),
+                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(10L, 20L)), "0+2", 11),
+                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(10L, 20L)), "0+2+2", 15),
+                new KeyValueTimestamp<>(new Windowed<>("B", new 
TimeWindow(15L, 25L)), "0+2", 15),
+                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(20L, 30L)), "0+3", 25),
+                new KeyValueTimestamp<>(new Windowed<>("C", new 
TimeWindow(25L, 35L)), "0+3", 25)
+            );
+        }
+
+        assertThat(windowedMessages, is(expectResult));
+    }
+
+    @Test
+    public void shouldAggregateWindowedWithGrace() throws Exception {
+        produceMessages(
+            streamOneInput,
+            new KeyValueTimestamp<>("A", "1", 0),
+            new KeyValueTimestamp<>("A", "1", 5),
+            new KeyValueTimestamp<>("A", "1", 10), // close [-5, 5)

Review Comment:
   Not really 🥲 . Will update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to