This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ababc4261b [9/N][Emit final] Emit final for session window 
aggregations (#12204)
ababc4261b is described below

commit ababc4261bfa03ee9d29ae7254ddd0ba988f826d
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Wed Jun 29 09:22:37 2022 -0700

    [9/N][Emit final] Emit final for session window aggregations (#12204)
    
    * Add a new API for session windows to range query session window by end 
time (KIP related).
    * Augment session window aggregator with emit strategy.
    * Minor: consolidated some dup classes.
    * Test: unit test on session window aggregator.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 .../streams/kstream/SessionWindowedKStream.java    |   5 +-
 .../kafka/streams/kstream/TimeWindowedKStream.java |   1 +
 ...bstractKStreamTimeWindowAggregateProcessor.java |  11 +-
 .../internals/CogroupedStreamAggregateBuilder.java |   2 +
 .../internals/KStreamSessionWindowAggregate.java   | 272 ++++++++++++++++-----
 .../kstream/internals/KStreamWindowAggregate.java  |   7 -
 .../kstream/internals/SessionTupleForwarder.java   |  56 -----
 .../internals/SessionWindowedKStreamImpl.java      |  29 ++-
 .../kstream/internals/TimeWindowedKStreamImpl.java |  55 +++--
 .../internals/TimestampedTupleForwarder.java       |   3 +-
 .../internals/AbstractReadWriteDecorator.java      |   6 +
 .../apache/kafka/streams/state/SessionStore.java   |  13 +
 ...tractRocksDBTimeOrderedSegmentedBytesStore.java |   6 +-
 .../internals/ChangeLoggingSessionBytesStore.java  |  12 +-
 .../state/internals/InMemorySessionStore.java      |  21 +-
 .../state/internals/MeteredSessionStore.java       |  12 +
 .../state/internals/PrefixedSessionKeySchemas.java |  13 +-
 ...cksDBTimeOrderedSessionSegmentedBytesStore.java |  33 ++-
 .../internals/RocksDBTimeOrderedSessionStore.java  |   7 +
 .../streams/state/internals/SegmentIterator.java   |   2 +-
 .../state/internals/SegmentedBytesStore.java       |   4 +-
 .../streams/state/internals/SessionKeySchema.java  |   2 +-
 ...KStreamSessionWindowAggregateProcessorTest.java | 219 ++++++++++++-----
 .../internals/KStreamWindowAggregateTest.java      |   2 +-
 .../internals/SessionTupleForwarderTest.java       | 108 --------
 .../internals/SessionWindowedKStreamImplTest.java  | 171 +++++++++----
 .../internals/TimeWindowedKStreamImplTest.java     |   2 +-
 .../internals/graph/GraphGraceSearchUtilTest.java  |   8 +-
 28 files changed, 676 insertions(+), 406 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
index c561b62abf..fe897515a9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindowedKStream.java
@@ -39,7 +39,7 @@ import java.time.Duration;
  * materialized view) that can be queried using the name provided in the 
{@link Materialized} instance.
  * Furthermore, updates to the store are sent downstream into a windowed 
{@link KTable} changelog stream, where
  * "windowed" implies that the {@link KTable} key is a combined key of the 
original record key and a window ID.
- * New events are added to sessions until their grace period ends (see {@link 
SessionWindows#grace(Duration)}).
+ * New events are added to sessions until their grace period ends (see {@link 
SessionWindows#ofInactivityGapAndGrace(Duration, Duration)}).
  * <p>
  * A {@code SessionWindowedKStream} must be obtained from a {@link 
KGroupedStream} via
  * {@link KGroupedStream#windowedBy(SessionWindows)}.
@@ -643,4 +643,7 @@ public interface SessionWindowedKStream<K, V> {
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                   final Named named,
                                   final Materialized<K, V, SessionStore<Bytes, 
byte[]>> materialized);
+
+    // TODO: add javadoc
+    SessionWindowedKStream<K, V> emitStrategy(final EmitStrategy emitStrategy);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 46ebd267f9..3f36838f20 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -649,5 +649,6 @@ public interface TimeWindowedKStream<K, V> {
                                   final Named named,
                                   final Materialized<K, V, WindowStore<Bytes, 
byte[]>> materialized);
 
+    // TODO: add javadoc
     TimeWindowedKStream<K, V> emitStrategy(final EmitStrategy emitStrategy);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
index d39dad1f79..a081a280ba 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java
@@ -73,11 +73,10 @@ public abstract class 
AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
         internalProcessorContext = (InternalProcessorContext<Windowed<KIn>, 
Change<VAgg>>) context;
         final StreamsMetricsImpl metrics = internalProcessorContext.metrics();
         final String threadId = Thread.currentThread().getName();
+        final String processorName = 
internalProcessorContext.currentNode().name();
         droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
-        emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(),
-            internalProcessorContext.currentNode().name(), metrics);
-        emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(),
-            internalProcessorContext.currentNode().name(), metrics);
+        emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(), processorName, metrics);
+        emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(), processorName, metrics);
         windowStore = context.getStateStore(storeName);
 
         if (emitStrategy.type() == StrategyType.ON_WINDOW_CLOSE) {
@@ -175,7 +174,7 @@ public abstract class 
AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
         observedStreamTime = Math.max(observedStreamTime, timestamp);
     }
 
-    private boolean shouldEmitFinal(final long closeTime) {
+    private boolean shouldEmitFinal(final long windowCloseTime) {
         if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
             return false;
         }
@@ -192,7 +191,7 @@ public abstract class 
AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg
         timeTracker.advanceNextTimeToEmit();
 
         // Only EMIT if the window close time does progress
-        return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP || 
lastEmitWindowCloseTime < closeTime;
+        return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP || 
lastEmitWindowCloseTime < windowCloseTime;
     }
 
     private void fetchAndEmit(final Record<KIn, VIn> record,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index 17dd413c45..3adc8beec8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -100,6 +100,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                 (KStreamAggProcessorSupplier<K, ?, K, ?>) new 
KStreamWindowAggregate<K, K, VOut, W>(
                     windows,
                     storeBuilder.name(),
+                    EmitStrategy.onWindowUpdate(),
                     initializer,
                     kGroupedStream.getValue());
             parentProcessors.add(parentProcessor);
@@ -138,6 +139,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                 (KStreamAggProcessorSupplier<K, ?, K, ?>) new 
KStreamSessionWindowAggregate<K, K, VOut>(
                     sessionWindows,
                     storeBuilder.name(),
+                    EmitStrategy.onWindowUpdate(),
                     initializer,
                     kGroupedStream.getValue(),
                     sessionMerger);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index eff7ac327a..f8252358b0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -18,8 +18,11 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.SessionWindows;
@@ -29,6 +32,7 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
@@ -39,6 +43,9 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 
 public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupplier<KIn, VIn, Windowed<KIn>, VAgg> {
@@ -50,16 +57,19 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
     private final Initializer<VAgg> initializer;
     private final Aggregator<? super KIn, ? super VIn, VAgg> aggregator;
     private final Merger<? super KIn, VAgg> sessionMerger;
+    private final EmitStrategy emitStrategy;
 
     private boolean sendOldValues = false;
 
     public KStreamSessionWindowAggregate(final SessionWindows windows,
-        final String storeName,
-        final Initializer<VAgg> initializer,
-        final Aggregator<? super KIn, ? super VIn, VAgg> aggregator,
-        final Merger<? super KIn, VAgg> sessionMerger) {
+                                         final String storeName,
+                                         final EmitStrategy emitStrategy,
+                                         final Initializer<VAgg> initializer,
+                                         final Aggregator<? super KIn, ? super 
VIn, VAgg> aggregator,
+                                         final Merger<? super KIn, VAgg> 
sessionMerger) {
         this.windows = windows;
         this.storeName = storeName;
+        this.emitStrategy = emitStrategy;
         this.initializer = initializer;
         this.aggregator = aggregator;
         this.sessionMerger = sessionMerger;
@@ -83,24 +93,50 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
         ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
 
         private SessionStore<KIn, VAgg> store;
-        private SessionTupleForwarder<KIn, VAgg> tupleForwarder;
+        private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
         private Sensor droppedRecordsSensor;
+        private Sensor emittedRecordsSensor;
+        private Sensor emitFinalLatencySensor;
+        private long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> 
internalProcessorContext;
+
+        private final Time time = Time.SYSTEM;
+        protected final KStreamImplJoin.TimeTracker timeTracker = new 
KStreamImplJoin.TimeTracker();
 
         @Override
         public void init(final ProcessorContext<Windowed<KIn>, Change<VAgg>> 
context) {
             super.init(context);
+            internalProcessorContext = 
(InternalProcessorContext<Windowed<KIn>, Change<VAgg>>) context;
             final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
             final String threadId = Thread.currentThread().getName();
-            droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(),
-                metrics);
+            final String processorName = 
internalProcessorContext.currentNode().name();
+            droppedRecordsSensor = droppedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+            emittedRecordsSensor = emittedRecordsSensor(threadId, 
context.taskId().toString(), processorName, metrics);
+            emitFinalLatencySensor = emitFinalLatencySensor(threadId, 
context.taskId().toString(), processorName, metrics);
             store = context.getStateStore(storeName);
-            tupleForwarder = new SessionTupleForwarder<>(
-                store,
-                context,
-                new SessionCacheFlushListener<>(context),
-                sendOldValues
-            );
+
+            if (emitStrategy.type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+                // Restore last emit close time for ON_WINDOW_CLOSE strategy
+                final Long lastEmitWindowCloseTime = 
internalProcessorContext.processorMetadataForKey(storeName);
+                if (lastEmitWindowCloseTime != null) {
+                    this.lastEmitWindowCloseTime = lastEmitWindowCloseTime;
+                }
+                final long emitInterval = StreamsConfig.InternalConfig.getLong(
+                        context.appConfigs(),
+                        EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
+                        1000L
+                );
+                timeTracker.setEmitInterval(emitInterval);
+
+                tupleForwarder = new TimestampedTupleForwarder<>(context, 
sendOldValues);
+            } else {
+                tupleForwarder = new TimestampedTupleForwarder<>(
+                    store,
+                    context,
+                    new SessionCacheFlushListener<>(context),
+                    sendOldValues);
+            }
         }
 
         @Override
@@ -108,25 +144,13 @@ public class KStreamSessionWindowAggregate<KIn, VIn, 
VAgg> implements KStreamAgg
             // if the key is null, we do not need proceed aggregating
             // the record with the table
             if (record.key() == null) {
-                if (context().recordMetadata().isPresent()) {
-                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
-                    LOG.warn(
-                        "Skipping record due to null key. "
-                            + "topic=[{}] partition=[{}] offset=[{}]",
-                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
-                    );
-                } else {
-                    LOG.warn(
-                        "Skipping record due to null key. Topic, partition, 
and offset not known."
-                    );
-                }
-                droppedRecordsSensor.record();
+                logSkippedRecordForNullKey();
                 return;
             }
 
             final long timestamp = record.timestamp();
             observedStreamTime = Math.max(observedStreamTime, timestamp);
-            final long closeTime = observedStreamTime - 
windows.gracePeriodMs() - windows.inactivityGap();
+            final long windowCloseTime = observedStreamTime - 
windows.gracePeriodMs() - windows.inactivityGap();
 
             final List<KeyValue<Windowed<KIn>, VAgg>> merged = new 
ArrayList<>();
             final SessionWindow newSessionWindow = new 
SessionWindow(timestamp, timestamp);
@@ -148,55 +172,174 @@ public class KStreamSessionWindowAggregate<KIn, VIn, 
VAgg> implements KStreamAgg
                 }
             }
 
-            if (mergedWindow.end() < closeTime) {
-                if (context().recordMetadata().isPresent()) {
-                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
-                    LOG.warn(
-                        "Skipping record for expired window. " +
-                            "topic=[{}] " +
-                            "partition=[{}] " +
-                            "offset=[{}] " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset(),
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                } else {
-                    LOG.warn(
-                        "Skipping record for expired window. Topic, partition, 
and offset not known. " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                }
-                droppedRecordsSensor.record();
+            if (mergedWindow.end() < windowCloseTime) {
+                logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, 
mergedWindow);
             } else {
                 if (!mergedWindow.equals(newSessionWindow)) {
                     for (final KeyValue<Windowed<KIn>, VAgg> session : merged) 
{
                         store.remove(session.key);
-                        tupleForwarder.maybeForward(
-                            record.withKey(session.key)
-                                .withValue(new Change<>(null, session.value)));
+
+                        maybeForwardUpdate(session.key, session.value, null);
                     }
                 }
 
                 agg = aggregator.apply(record.key(), record.value(), agg);
                 final Windowed<KIn> sessionKey = new Windowed<>(record.key(), 
mergedWindow);
                 store.put(sessionKey, agg);
+
+                maybeForwardUpdate(sessionKey, null, agg);
+            }
+
+            maybeForwardFinalResult(record, windowCloseTime);
+        }
+
+        private void maybeForwardUpdate(final Windowed<KIn> windowedkey,
+                                        final VAgg oldAgg,
+                                        final VAgg newAgg) {
+            if (emitStrategy.type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+                return;
+            }
+
+            // Update the sent record timestamp to the window end time if 
possible
+            final long newTimestamp = windowedkey.window().end();
+            tupleForwarder.maybeForward(new Record<>(windowedkey, new 
Change<>(newAgg, sendOldValues ? oldAgg : null), newTimestamp));
+        }
+
+        // TODO: consolidate SessionWindow with TimeWindow to merge common 
functions
+        private void maybeForwardFinalResult(final Record<KIn, VIn> record, 
final long windowCloseTime) {
+            if (shouldEmitFinal(windowCloseTime)) {
+                final long emitRangeUpperBound = 
emitRangeUpperBound(windowCloseTime);
+
+                // if the upper bound is smaller than 0, then there's no 
window closed ever;
+                // and we can skip range fetching
+                if (emitRangeUpperBound >= 0) {
+                    final long emitRangeLowerBound = emitRangeLowerBound();
+
+                    if (shouldRangeFetch(emitRangeLowerBound, 
emitRangeUpperBound)) {
+                        fetchAndEmit(record, windowCloseTime, 
emitRangeLowerBound, emitRangeUpperBound);
+                    }
+                }
+            }
+        }
+
+        private boolean shouldEmitFinal(final long windowCloseTime) {
+            if (emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+                return false;
+            }
+
+            final long now = internalProcessorContext.currentSystemTimeMs();
+            // Throttle emit frequency
+            if (now < timeTracker.nextTimeToEmit) {
+                return false;
+            }
+
+            // Schedule next emit time based on now to avoid the case that if 
system time jumps a lot,
+            // this can be triggered every time
+            timeTracker.nextTimeToEmit = now;
+            timeTracker.advanceNextTimeToEmit();
+
+            // Only EMIT if the window close time does progress
+            return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP || 
lastEmitWindowCloseTime < windowCloseTime;
+        }
+
+        private long emitRangeLowerBound() {
+            return Math.max(0L, lastEmitWindowCloseTime);
+        }
+
+        private long emitRangeUpperBound(final long windowCloseTime) {
+            // Session window's start and end timestamps are inclusive, so
+            // we should minus 1 for the inclusive closed window-end upper 
bound
+            return windowCloseTime - 1;
+        }
+
+        private boolean shouldRangeFetch(final long emitRangeLowerBound, final 
long emitRangeUpperBound) {
+            // since a session window could be a single point (i.e. [t, t]),
+            // we need to range fetch and emit even if the upper and lower 
bound are the same
+            return emitRangeUpperBound >= emitRangeLowerBound;
+        }
+
+        private void fetchAndEmit(final Record<KIn, VIn> record,
+                                  final long windowCloseTime,
+                                  final long emitRangeLowerBound,
+                                  final long emitRangeUpperBound) {
+            final long startMs = time.milliseconds();
+
+            // Only time ordered (indexed) session store should have 
implemented
+            // this function, otherwise a not-supported exception would throw
+            final KeyValueIterator<Windowed<KIn>, VAgg> windowToEmit = store
+                .findSessions(emitRangeLowerBound, emitRangeUpperBound);
+
+            int emittedCount = 0;
+            while (windowToEmit.hasNext()) {
+                emittedCount++;
+                final KeyValue<Windowed<KIn>, VAgg> kv = windowToEmit.next();
+
                 tupleForwarder.maybeForward(
-                    record.withKey(sessionKey)
-                        .withValue(new Change<>(agg, null)));
+                    record.withKey(kv.key)
+                        .withValue(new Change<>(kv.value, null))
+                        // set the timestamp as the window end timestamp
+                        .withTimestamp(kv.key.window().end())
+                        .withHeaders(record.headers()));
             }
+            emittedRecordsSensor.record(emittedCount);
+            emitFinalLatencySensor.record(time.milliseconds() - startMs);
+
+            lastEmitWindowCloseTime = windowCloseTime;
+            internalProcessorContext.addProcessorMetadataKeyValue(storeName, 
windowCloseTime);
+        }
+
+        private void logSkippedRecordForNullKey() {
+            if (context().recordMetadata().isPresent()) {
+                final RecordMetadata recordMetadata = 
context().recordMetadata().get();
+                LOG.warn(
+                        "Skipping record due to null key. "
+                                + "topic=[{}] partition=[{}] offset=[{}]",
+                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
+                );
+            } else {
+                LOG.warn(
+                        "Skipping record due to null key. Topic, partition, 
and offset not known."
+                );
+            }
+            droppedRecordsSensor.record();
+        }
+
+        private void logSkippedRecordForExpiredWindow(final long timestamp,
+                                                      final long windowExpire,
+                                                      final SessionWindow 
window) {
+            final String windowString = "[" + window.start() + "," + 
window.end() + "]";
+
+            if (context().recordMetadata().isPresent()) {
+                final RecordMetadata recordMetadata = 
context().recordMetadata().get();
+                LOG.warn("Skipping record for expired window. " +
+                                "topic=[{}] " +
+                                "partition=[{}] " +
+                                "offset=[{}] " +
+                                "timestamp=[{}] " +
+                                "window={} " +
+                                "expiration=[{}] " +
+                                "streamTime=[{}]",
+                        recordMetadata.topic(),
+                        recordMetadata.partition(),
+                        recordMetadata.offset(),
+                        timestamp,
+                        windowString,
+                        windowExpire,
+                        observedStreamTime
+                );
+            } else {
+                LOG.warn("Skipping record for expired window. Topic, 
partition, and offset not known. " +
+                                "timestamp=[{}] " +
+                                "window={} " +
+                                "expiration=[{}] " +
+                                "streamTime=[{}]",
+                        timestamp,
+                        windowString,
+                        windowExpire,
+                        observedStreamTime
+                );
+            }
+            droppedRecordsSensor.record();
         }
     }
 
@@ -237,5 +380,4 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
                 key.window().end());
         }
     }
-
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 81687cb9e0..561524f87e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -49,13 +49,6 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W 
extends Window> implements
 
     private boolean sendOldValues = false;
 
-    public KStreamWindowAggregate(final Windows<W> windows,
-                                  final String storeName,
-                                  final Initializer<VAgg> initializer,
-                                  final Aggregator<? super KIn, ? super VIn, 
VAgg> aggregator) {
-        this(windows, storeName, EmitStrategy.onWindowUpdate(), initializer, 
aggregator);
-    }
-
     public KStreamWindowAggregate(final Windows<W> windows,
                                   final String storeName,
                                   final EmitStrategy emitStrategy,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
deleted file mode 100644
index e1c302f875..0000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarder.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.internals.CacheFlushListener;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
-
-/**
- * This class is used to determine if a processor should forward values to 
child nodes.
- * Forwarding by this class only occurs when caching is not enabled. If 
caching is enabled,
- * forwarding occurs in the flush listener when the cached store flushes.
- *
- * @param <K>
- * @param <V>
- */
-class SessionTupleForwarder<K, V> {
-    private final ProcessorContext<Windowed<K>, Change<V>> context;
-    private final boolean sendOldValues;
-    private final boolean cachingEnabled;
-
-    @SuppressWarnings("unchecked")
-    SessionTupleForwarder(final StateStore store,
-                          final ProcessorContext<Windowed<K>, Change<V>> 
context,
-                          final CacheFlushListener<Windowed<K>, V> 
flushListener,
-                          final boolean sendOldValues) {
-        this.context = context;
-        this.sendOldValues = sendOldValues;
-        cachingEnabled = ((WrappedStateStore) 
store).setFlushListener(flushListener, sendOldValues);
-    }
-
-    public void maybeForward(final Record<Windowed<K>, Change<V>> record) {
-        if (!cachingEnabled) {
-            context.forward(
-                record.withValue(new Change<>(record.value().newValue, 
sendOldValues ? record.value().oldValue : null))
-                    .withTimestamp(record.key() != null ? 
record.key().window().end() : record.timestamp()));
-        }
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index f18c6ef568..c3b05cb118 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -35,6 +36,7 @@ import 
org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
+import 
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
 
 import java.time.Duration;
 import java.util.Objects;
@@ -48,6 +50,8 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
     private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
     private final Merger<K, Long> countMerger = (aggKey, aggOne, aggTwo) -> 
aggOne + aggTwo;
 
+    private EmitStrategy emitStrategy = EmitStrategy.onWindowUpdate();
+
     SessionWindowedKStreamImpl(final SessionWindows windows,
                                final InternalStreamsBuilder builder,
                                final Set<String> subTopologySourceNodes,
@@ -90,6 +94,12 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         return doCount(named, materialized);
     }
 
+    @Override
+    public SessionWindowedKStream<K, V> emitStrategy(final EmitStrategy 
emitStrategy) {
+        this.emitStrategy = emitStrategy;
+        return this;
+    }
+
     private KTable<Windowed<K>, Long> doCount(final Named named,
                                               final Materialized<K, Long, 
SessionStore<Bytes, byte[]>> materialized) {
         final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> 
materializedInternal =
@@ -109,6 +119,7 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
             new KStreamSessionWindowAggregate<>(
                 windows,
                 materializedInternal.storeName(),
+                emitStrategy,
                 aggregateBuilder.countInitializer,
                 aggregateBuilder.countAggregator,
                 countMerger),
@@ -157,6 +168,7 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
             new KStreamSessionWindowAggregate<>(
                 windows,
                 materializedInternal.storeName(),
+                emitStrategy,
                 aggregateBuilder.reduceInitializer,
                 reduceAggregator,
                 mergerForAggregator(reduceAggregator)
@@ -214,6 +226,7 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
             new KStreamSessionWindowAggregate<>(
                 windows,
                 materializedInternal.storeName(),
+                emitStrategy,
                 initializer,
                 aggregator,
                 sessionMerger),
@@ -246,10 +259,15 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
                     );
                     break;
                 case ROCKS_DB:
-                    supplier = Stores.persistentSessionStore(
-                        materialized.storeName(),
-                        Duration.ofMillis(retentionPeriod)
-                    );
+                    supplier = emitStrategy.type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
+                        new RocksDbTimeOrderedSessionBytesStoreSupplier(
+                            materialized.storeName(),
+                            retentionPeriod,
+                            true) :
+                        Stores.persistentSessionStore(
+                            materialized.storeName(),
+                            Duration.ofMillis(retentionPeriod)
+                        );
                     break;
                 default:
                     throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
@@ -268,7 +286,8 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
             builder.withLoggingDisabled();
         }
 
-        if (materialized.cachingEnabled()) {
+        // do not enable cache if the emit final strategy is used
+        if (materialized.cachingEnabled() && emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
             builder.withCachingEnabled();
         }
         return builder;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 1f2c732878..c07b783978 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -110,14 +110,17 @@ public class TimeWindowedKStreamImpl<K, V, W extends 
Window> extends AbstractStr
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
 
         return aggregateBuilder.build(
-                new NamedInternal(aggregateName),
-                materialize(materializedInternal),
-                new KStreamWindowAggregate<>(windows, 
materializedInternal.storeName(), emitStrategy, 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
-                materializedInternal.queryableStoreName(),
-                materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
-                materializedInternal.valueSerde());
-
-
+            new NamedInternal(aggregateName),
+            materialize(materializedInternal),
+            new KStreamWindowAggregate<>(
+                windows,
+                materializedInternal.storeName(),
+                emitStrategy,
+                aggregateBuilder.countInitializer,
+                aggregateBuilder.countAggregator),
+            materializedInternal.queryableStoreName(),
+            materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
+            materializedInternal.valueSerde());
     }
 
     @Override
@@ -158,14 +161,17 @@ public class TimeWindowedKStreamImpl<K, V, W extends 
Window> extends AbstractStr
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
 
         return aggregateBuilder.build(
-                new NamedInternal(aggregateName),
-                materialize(materializedInternal),
-                new KStreamWindowAggregate<>(windows, 
materializedInternal.storeName(), emitStrategy, initializer, aggregator),
-                materializedInternal.queryableStoreName(),
-                materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
-                materializedInternal.valueSerde());
-
-
+            new NamedInternal(aggregateName),
+            materialize(materializedInternal),
+            new KStreamWindowAggregate<>(
+                windows,
+                materializedInternal.storeName(),
+                emitStrategy,
+                initializer,
+                aggregator),
+            materializedInternal.queryableStoreName(),
+            materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
+            materializedInternal.valueSerde());
     }
 
     @Override
@@ -205,12 +211,17 @@ public class TimeWindowedKStreamImpl<K, V, W extends 
Window> extends AbstractStr
         final String reduceName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
 
         return aggregateBuilder.build(
-                new NamedInternal(reduceName),
-                materialize(materializedInternal),
-                new KStreamWindowAggregate<>(windows, 
materializedInternal.storeName(), emitStrategy, 
aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
-                materializedInternal.queryableStoreName(),
-                materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
-                materializedInternal.valueSerde());
+            new NamedInternal(reduceName),
+            materialize(materializedInternal),
+            new KStreamWindowAggregate<>(
+                windows,
+                materializedInternal.storeName(),
+                emitStrategy,
+                aggregateBuilder.reduceInitializer,
+                aggregatorForReducer(reducer)),
+            materializedInternal.queryableStoreName(),
+            materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
+            materializedInternal.valueSerde());
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index 34acbd99bd..bc686ada72 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -20,6 +20,7 @@ import 
org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
 /**
@@ -38,7 +39,7 @@ class TimestampedTupleForwarder<K, V> {
     @SuppressWarnings({"unchecked", "rawtypes"})
     TimestampedTupleForwarder(final StateStore store,
                               final ProcessorContext<K, Change<V>> context,
-                              final TimestampedCacheFlushListener<K, V> 
flushListener,
+                              final CacheFlushListener<K, ?> flushListener,
                               final boolean sendOldValues) {
         this.context = (InternalProcessorContext<K, Change<V>>) context;
         this.sendOldValues = sendOldValues;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index aff099af5f..3c7f70ea07 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -259,6 +259,12 @@ abstract class AbstractReadWriteDecorator<T extends 
StateStore, K, V> extends Wr
             return wrapped().findSessions(keyFrom, keyTo, 
earliestSessionEndTime, latestSessionStartTime);
         }
 
+        @Override
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final long 
earliestSessionEndTime,
+                                                               final long 
latestSessionEndTime) {
+            return wrapped().findSessions(earliestSessionEndTime, 
latestSessionEndTime);
+        }
+
         @Override
         public void remove(final Windowed<K> sessionKey) {
             wrapped().remove(sessionKey);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java 
b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
index cbc1cc5b96..76a4317394 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java
@@ -39,6 +39,19 @@ import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
  */
 public interface SessionStore<K, AGG> extends StateStore, 
ReadOnlySessionStore<K, AGG> {
 
+    /**
+     * Return all the session window entries that ends between the specified 
range (both ends are inclusive).
+     * This function would be used to retrieve all closed and immutable 
windows.
+     *
+     * @param earliestSessionEndTime earliest session end time to search from, 
inclusive
+     * @param latestSessionEndTime latest session end time to search to, 
inclusive
+     */
+    default KeyValueIterator<Windowed<K>, AGG> findSessions(final long 
earliestSessionEndTime,
+                                                            final long 
latestSessionEndTime) {
+        throw new UnsupportedOperationException(
+                "This API is not supported by this implementation of 
SessionStore.");
+    }
+
     @Override
     default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
                                                             final Instant 
earliestSessionEndTime,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
index f7216412f0..0398f0ca06 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java
@@ -142,8 +142,7 @@ public abstract class 
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
                                           final long to,
                                           final boolean forward) {
         if (indexKeySchema.isPresent()) {
-            final List<KeyValueSegment> searchSpace = 
indexKeySchema.get().segmentsToSearch(segments, from, to,
-                forward);
+            final List<KeyValueSegment> searchSpace = 
indexKeySchema.get().segmentsToSearch(segments, from, to, forward);
 
             final Bytes binaryFrom = 
indexKeySchema.get().lowerRangeFixedSize(key, from);
             final Bytes binaryTo = 
indexKeySchema.get().upperRangeFixedSize(key, to);
@@ -156,8 +155,7 @@ public abstract class 
AbstractRocksDBTimeOrderedSegmentedBytesStore extends Abst
                 forward));
         }
 
-        final List<KeyValueSegment> searchSpace = 
baseKeySchema.segmentsToSearch(segments, from, to,
-            forward);
+        final List<KeyValueSegment> searchSpace = 
baseKeySchema.segmentsToSearch(segments, from, to, forward);
 
         final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(key, from);
         final Bytes binaryTo = baseKeySchema.upperRangeFixedSize(key, to);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index ff387ef38e..fd32798801 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -31,9 +31,9 @@ import static 
org.apache.kafka.streams.processor.internals.ProcessorContextUtils
  * Simple wrapper around a {@link SessionStore} to support writing
  * updates to a changelog
  */
-class ChangeLoggingSessionBytesStore
-        extends WrappedStateStore<SessionStore<Bytes, byte[]>, byte[], byte[]>
-        implements SessionStore<Bytes, byte[]> {
+public class ChangeLoggingSessionBytesStore
+    extends WrappedStateStore<SessionStore<Bytes, byte[]>, byte[], byte[]>
+    implements SessionStore<Bytes, byte[]> {
 
     private InternalProcessorContext context;
 
@@ -95,6 +95,12 @@ class ChangeLoggingSessionBytesStore
         return wrapped().fetchSession(key, earliestSessionEndTime, 
latestSessionStartTime);
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionEndTime) {
+        return wrapped().findSessions(earliestSessionEndTime, 
latestSessionEndTime);
+    }
+
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes 
key) {
         return wrapped().backwardFetch(key);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index 97984dd156..579abc3678 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -202,25 +202,36 @@ public class InMemorySessionStore implements 
SessionStore<Bytes, byte[]> {
 
     @Override
     public byte[] fetchSession(final Bytes key,
-                               final long earliestSessionEndTime,
-                               final long latestSessionStartTime) {
+                               final long sessionStartTime,
+                               final long sessionEndTime) {
         removeExpiredSegments();
 
         Objects.requireNonNull(key, "key cannot be null");
 
         // Only need to search if the record hasn't expired yet
-        if (latestSessionStartTime > observedStreamTime - retentionPeriod) {
-            final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, 
byte[]>> keyMap = endTimeMap.get(latestSessionStartTime);
+        if (sessionEndTime > observedStreamTime - retentionPeriod) {
+            final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, 
byte[]>> keyMap = endTimeMap.get(sessionEndTime);
             if (keyMap != null) {
                 final ConcurrentNavigableMap<Long, byte[]> startTimeMap = 
keyMap.get(key);
                 if (startTimeMap != null) {
-                    return startTimeMap.get(earliestSessionEndTime);
+                    return startTimeMap.get(sessionStartTime);
                 }
             }
         }
         return null;
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionEndTime) {
+        removeExpiredSegments();
+
+        final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, 
ConcurrentNavigableMap<Long, byte[]>>> endTimSubMap
+            = endTimeMap.subMap(earliestSessionEndTime, true, 
latestSessionEndTime, true);
+
+        return registerNewIterator(null, null, Long.MAX_VALUE, 
endTimSubMap.entrySet().iterator(), true);
+    }
+
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
key,
                                                                   final long 
earliestSessionEndTime,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index ad75e35e7a..bc4f2169b3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -346,6 +346,18 @@ public class MeteredSessionStore<K, V>
             time);
     }
 
+    @Override
+    public KeyValueIterator<Windowed<K>, V> findSessions(final long 
earliestSessionEndTime,
+                                                         final long 
latestSessionEndTime) {
+        return new MeteredWindowedKeyValueIterator<>(
+                wrapped().findSessions(earliestSessionEndTime, 
latestSessionEndTime),
+                fetchSensor,
+                streamsMetrics,
+                serdes::keyFrom,
+                serdes::valueFrom,
+                time);
+    }
+
     @Override
     public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K 
keyFrom,
                                                                  final K keyTo,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
index c98ae83390..2ac25277ba 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java
@@ -77,10 +77,8 @@ public class PrefixedSessionKeySchemas {
         }
 
         /**
-         *
          * @param key the key in the range
          * @param to the latest start time
-         * @return
          */
         @Override
         public Bytes upperRangeFixedSize(final Bytes key, final long to) {
@@ -88,10 +86,8 @@ public class PrefixedSessionKeySchemas {
         }
 
         /**
-         *
          * @param key the key in the range
          * @param from the earliest end timestamp in the range
-         * @return
          */
         @Override
         public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
@@ -105,7 +101,10 @@ public class PrefixedSessionKeySchemas {
 
         @Override
         public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom,
-            final Bytes binaryKeyTo, final long from, final long to, final 
boolean forward) {
+                                                 final Bytes binaryKeyTo,
+                                                 final long from,
+                                                 final long to,
+                                                 final boolean forward) {
             return iterator -> {
                 while (iterator.hasNext()) {
                     final Bytes bytes = iterator.peekNextKey();
@@ -204,7 +203,9 @@ public class PrefixedSessionKeySchemas {
                                        final long endTime) {
             buf.putLong(endTime);
             buf.putLong(startTime);
-            buf.put(key.get());
+            if (key != null) {
+                buf.put(key.get());
+            }
         }
 
         public static Bytes toBinary(final Bytes key,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
index 4265150eb9..172d321881 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -38,7 +39,7 @@ import org.rocksdb.WriteBatch;
  */
 public class RocksDBTimeOrderedSessionSegmentedBytesStore extends 
AbstractRocksDBTimeOrderedSegmentedBytesStore {
 
-    private class SessionKeySchemaIndexToBaseStoreIterator  extends 
IndexToBaseStoreIterator {
+    private class SessionKeySchemaIndexToBaseStoreIterator extends 
IndexToBaseStoreIterator {
         SessionKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, 
byte[]> indexIterator) {
             super(indexIterator);
         }
@@ -71,6 +72,36 @@ public class RocksDBTimeOrderedSessionSegmentedBytesStore 
extends AbstractRocksD
         ));
     }
 
+    public KeyValueIterator<Bytes, byte[]> fetchSessions(final long 
earliestSessionEndTime,
+                                                         final long 
latestSessionEndTime) {
+        final List<KeyValueSegment> searchSpace = 
segments.segments(earliestSessionEndTime, latestSessionEndTime, true);
+
+        // here we want [0, latestSE, FF] as the upper bound to cover any 
possible keys,
+        // but since we can only get upper bound based on timestamps, we use a 
slight larger upper bound as [0, latestSE+1]
+        final Bytes binaryFrom = baseKeySchema.lowerRangeFixedSize(null, 
earliestSessionEndTime);
+        final Bytes binaryTo = baseKeySchema.lowerRangeFixedSize(null, 
latestSessionEndTime + 1);
+
+        return new SegmentIterator<>(
+                searchSpace.iterator(),
+                iterator -> {
+                    while (iterator.hasNext()) {
+                        final Bytes bytes = iterator.peekNextKey();
+
+                        final Windowed<Bytes> windowedKey = 
TimeFirstSessionKeySchema.from(bytes);
+                        final long endTime = windowedKey.window().end();
+
+                        if (endTime <= latestSessionEndTime && endTime >= 
earliestSessionEndTime) {
+                            return true;
+                        }
+                        iterator.next();
+                    }
+                    return false;
+                },
+                binaryFrom,
+                binaryTo,
+                true);
+    }
+
     public void remove(final Windowed<Bytes> key) {
         remove(TimeFirstSessionKeySchema.toBinary(key));
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
index 5b72163757..deb6028ef6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java
@@ -61,6 +61,13 @@ public class RocksDBTimeOrderedSessionStore
         );
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionEndTime) {
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
wrapped().fetchSessions(earliestSessionEndTime, latestSessionEndTime);
+        return new WrappedSessionStoreIterator(bytesIterator, 
TimeFirstSessionKeySchema::from);
+    }
+
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
key,
                                                                   final long 
earliestSessionEndTime,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
index 6191c49888..9aabc787c8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
@@ -91,7 +91,7 @@ class SegmentIterator<S extends Segment> implements 
KeyValueIterator<Bytes, byte
         try {
             hasNext = hasNextCondition.hasNext(currentIterator);
         } catch (final InvalidStateStoreException e) {
-            //already closed so ignore
+            // already closed so ignore
         }
         return hasNext;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index 80b5a91ffa..1ef6a932f9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -91,8 +91,8 @@ public interface SegmentedBytesStore extends StateStore {
     /**
      * Gets all the key-value pairs that belong to the windows within in the 
given time range.
      *
-     * @param from the beginning of the time slot from which to search
-     * @param to   the end of the time slot from which to search
+     * @param from the beginning of the time slot from which to search 
(inclusive)
+     * @param to   the end of the time slot from which to search (inclusive)
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, 
value>}
      * @throws InvalidStateStoreException if the store is not initialized
      * @throws NullPointerException if null is used for any key
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index 505bbddc80..f21e47fd87 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -35,7 +35,7 @@ public class SessionKeySchema implements 
SegmentedBytesStore.KeySchema {
     private static final byte[] MIN_SUFFIX = new byte[SUFFIX_SIZE];
 
     public static int keyByteLength(final Bytes key) {
-        return key.get().length + 2 * TIMESTAMP_SIZE;
+        return (key == null ? 0 : key.get().length) + 2 * TIMESTAMP_SIZE;
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 21c6e6af12..fc993b63a9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -22,11 +22,11 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.SessionWindows;
@@ -40,9 +40,11 @@ import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender.Event;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
+import 
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockRecordCollector;
@@ -51,13 +53,18 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.Properties;
 import java.util.stream.Collectors;
 
 import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -69,29 +76,39 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-
+@RunWith(Parameterized.class)
 public class KStreamSessionWindowAggregateProcessorTest {
 
     private static final long GAP_MS = 5 * 60 * 1000L;
     private static final String STORE_NAME = "session-store";
 
+    private final MockTime time = new MockTime();
+    private final Metrics metrics = new Metrics();
+    private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "test", StreamsConfig.METRICS_LATEST, time);
     private final String threadId = Thread.currentThread().getName();
     private final Initializer<Long> initializer = () -> 0L;
     private final Aggregator<String, String, Long> aggregator = (aggKey, 
value, aggregate) -> aggregate + 1;
     private final Merger<String, Long> sessionMerger = (aggKey, aggOne, 
aggTwo) -> aggOne + aggTwo;
-    private final KStreamSessionWindowAggregate<String, String, Long> 
sessionAggregator =
-        new KStreamSessionWindowAggregate<>(
-            SessionWindows.ofInactivityGapWithNoGrace(ofMillis(GAP_MS)),
-            STORE_NAME,
-            initializer,
-            aggregator,
-            sessionMerger);
-
     private final List<KeyValueTimestamp<Windowed<String>, Change<Long>>> 
results = new ArrayList<>();
-    private final Processor<String, String, Windowed<String>, Change<Long>> 
processor = sessionAggregator.get();
-    private SessionStore<String, Long> sessionStore;
+
     private InternalMockProcessorContext<Windowed<String>, Change<Long>> 
context;
-    private final Metrics metrics = new Metrics();
+    private KStreamSessionWindowAggregate<String, String, Long> 
sessionAggregator;
+    private Processor<String, String, Windowed<String>, Change<Long>> 
processor;
+    private SessionStore<String, Long> sessionStore;
+
+    @Parameterized.Parameter
+    public EmitStrategy.StrategyType type;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> data() {
+        return asList(new Object[][] {
+            {EmitStrategy.StrategyType.ON_WINDOW_UPDATE},
+            {EmitStrategy.StrategyType.ON_WINDOW_CLOSE}
+        });
+    }
+
+    private EmitStrategy emitStrategy;
+    private boolean emitFinal;
 
     @Before
     public void setup() {
@@ -99,23 +116,44 @@ public class KStreamSessionWindowAggregateProcessorTest {
     }
 
     private void setup(final boolean enableCache) {
-        final StreamsMetricsImpl streamsMetrics =
-            new StreamsMetricsImpl(metrics, "test", 
StreamsConfig.METRICS_LATEST, new MockTime());
+        // Always process
+        final Properties prop = StreamsTestUtils.getStreamsConfig();
+        
prop.put(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
 0);
+        final StreamsConfig config = new StreamsConfig(prop);
+
         context = new InternalMockProcessorContext<Windowed<String>, 
Change<Long>>(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.String(),
             streamsMetrics,
-            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            config,
             MockRecordCollector::new,
             new ThreadCache(new LogContext("testCache "), 100000, 
streamsMetrics),
-            Time.SYSTEM
+            time
         ) {
             @Override
             public <K extends Windowed<String>, V extends Change<Long>> void 
forward(final Record<K, V> record) {
                 results.add(new KeyValueTimestamp<>(record.key(), 
record.value(), record.timestamp()));
             }
         };
+
+
+        emitFinal = type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
+        emitStrategy = EmitStrategy.StrategyType.forType(type);
+
+        sessionAggregator = new KStreamSessionWindowAggregate<>(
+            SessionWindows.ofInactivityGapWithNoGrace(ofMillis(GAP_MS)),
+            STORE_NAME,
+            emitStrategy,
+            initializer,
+            aggregator,
+            sessionMerger);
+
+        if (processor != null) {
+            processor.close();
+        }
+        processor = sessionAggregator.get();
+
         // Set initial timestamp for CachingSessionStore to prepare entry from 
as default
         // InternalMockProcessorContext#timestamp returns -1.
         context.setTime(0L);
@@ -126,14 +164,14 @@ public class KStreamSessionWindowAggregateProcessorTest {
     }
 
     private void initStore(final boolean enableCaching) {
-        final StoreBuilder<SessionStore<String, Long>> storeBuilder =
-            Stores.sessionStoreBuilder(
-                Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 
3)),
-                Serdes.String(),
-                Serdes.Long())
+        final SessionBytesStoreSupplier supplier = emitStrategy.type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
+            new RocksDbTimeOrderedSessionBytesStoreSupplier(STORE_NAME, GAP_MS 
* 3, true) :
+            Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3));
+
+        final StoreBuilder<SessionStore<String, Long>> storeBuilder = 
Stores.sessionStoreBuilder(supplier, Serdes.String(), Serdes.Long())
             .withLoggingDisabled();
 
-        if (enableCaching) {
+        if (enableCaching && emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
             storeBuilder.withCachingEnabled();
         }
 
@@ -147,6 +185,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     @After
     public void closeStore() {
         sessionStore.close();
+        processor.close();
     }
 
     @Test
@@ -198,35 +237,51 @@ public class KStreamSessionWindowAggregateProcessorTest {
     @Test
     public void 
shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() {
         final String sessionId = "mel";
-        long time = 0;
-        processor.process(new Record<>(sessionId, "first", time));
-        final long time1 = time += GAP_MS + 1;
-        processor.process(new Record<>(sessionId, "second", time1));
-        processor.process(new Record<>(sessionId, "second", time1));
-        final long time2 = time += GAP_MS + 1;
-        processor.process(new Record<>(sessionId, "third", time2));
-        processor.process(new Record<>(sessionId, "third", time2));
-        processor.process(new Record<>(sessionId, "third", time2));
+        long now = 0;
+        processor.process(new Record<>(sessionId, "first", now));
+        now += GAP_MS + 1;
+        processor.process(new Record<>(sessionId, "second", now));
+        processor.process(new Record<>(sessionId, "second", now));
+        now += GAP_MS + 1;
+        processor.process(new Record<>(sessionId, "third", now));
+        processor.process(new Record<>(sessionId, "third", now));
+        processor.process(new Record<>(sessionId, "third", now));
 
         sessionStore.flush();
-        assertEquals(
-            Arrays.asList(
-                new KeyValueTimestamp<>(
-                    new Windowed<>(sessionId, new SessionWindow(0, 0)),
-                    new Change<>(1L, null),
-                    0L),
-                new KeyValueTimestamp<>(
-                    new Windowed<>(sessionId, new SessionWindow(GAP_MS + 1, 
GAP_MS + 1)),
-                    new Change<>(2L, null),
-                    GAP_MS + 1),
-                new KeyValueTimestamp<>(
-                    new Windowed<>(sessionId, new SessionWindow(time, time)),
-                    new Change<>(3L, null),
-                    time)
-            ),
-            results
-        );
 
+        if (emitFinal) {
+            assertEquals(
+                Arrays.asList(
+                    new KeyValueTimestamp<>(
+                        new Windowed<>(sessionId, new SessionWindow(0, 0)),
+                        new Change<>(1L, null),
+                        0L),
+                    new KeyValueTimestamp<>(
+                        new Windowed<>(sessionId, new SessionWindow(GAP_MS + 
1, GAP_MS + 1)),
+                        new Change<>(2L, null),
+                        GAP_MS + 1)
+                ),
+                results
+            );
+        } else {
+            assertEquals(
+                Arrays.asList(
+                    new KeyValueTimestamp<>(
+                        new Windowed<>(sessionId, new SessionWindow(0, 0)),
+                        new Change<>(1L, null),
+                        0L),
+                    new KeyValueTimestamp<>(
+                        new Windowed<>(sessionId, new SessionWindow(GAP_MS + 
1, GAP_MS + 1)),
+                        new Change<>(2L, null),
+                        GAP_MS + 1),
+                    new KeyValueTimestamp<>(
+                        new Windowed<>(sessionId, new SessionWindow(now, now)),
+                        new Change<>(3L, null),
+                        now)
+                ),
+                results
+            );
+        }
     }
 
     @Test
@@ -264,8 +319,8 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
         sessionStore.flush();
 
-        assertEquals(
-            Arrays.asList(
+        if (emitFinal) {
+            assertEquals(Arrays.asList(
                 new KeyValueTimestamp<>(
                     new Windowed<>("a", new SessionWindow(0, 0)),
                     new Change<>(1L, null),
@@ -281,22 +336,44 @@ public class KStreamSessionWindowAggregateProcessorTest {
                 new KeyValueTimestamp<>(
                     new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)),
                     new Change<>(2L, null),
-                    GAP_MS / 2),
-                new KeyValueTimestamp<>(
-                    new Windowed<>("b", new SessionWindow(GAP_MS + 1, GAP_MS + 
1)),
-                    new Change<>(1L, null),
-                    GAP_MS + 1),
-                new KeyValueTimestamp<>(
-                    new Windowed<>("a", new SessionWindow(GAP_MS + 1, GAP_MS + 
1 + GAP_MS / 2)),
-                    new Change<>(2L, null),
-                    GAP_MS + 1 + GAP_MS / 2),
-                new KeyValueTimestamp<>(new Windowed<>(
-                    "c",
-                    new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 + 
GAP_MS / 2)), new Change<>(1L, null),
-                    GAP_MS + 1 + GAP_MS / 2)
-            ),
-            results
-        );
+                    GAP_MS / 2)
+                ),
+                results);
+        } else {
+            assertEquals(
+                Arrays.asList(
+                    new KeyValueTimestamp<>(
+                        new Windowed<>("a", new SessionWindow(0, 0)),
+                        new Change<>(1L, null),
+                        0L),
+                    new KeyValueTimestamp<>(
+                        new Windowed<>("b", new SessionWindow(0, 0)),
+                        new Change<>(1L, null),
+                        0L),
+                    new KeyValueTimestamp<>(
+                        new Windowed<>("c", new SessionWindow(0, 0)),
+                        new Change<>(1L, null),
+                       0L),
+                    new KeyValueTimestamp<>(
+                        new Windowed<>("d", new SessionWindow(0, GAP_MS / 2)),
+                        new Change<>(2L, null),
+                        GAP_MS / 2),
+                    new KeyValueTimestamp<>(
+                        new Windowed<>("b", new SessionWindow(GAP_MS + 1, 
GAP_MS + 1)),
+                        new Change<>(1L, null),
+                        GAP_MS + 1),
+                    new KeyValueTimestamp<>(
+                        new Windowed<>("a", new SessionWindow(GAP_MS + 1, 
GAP_MS + 1 + GAP_MS / 2)),
+                        new Change<>(2L, null),
+                        GAP_MS + 1 + GAP_MS / 2),
+                    new KeyValueTimestamp<>(new Windowed<>(
+                        "c",
+                        new SessionWindow(GAP_MS + 1 + GAP_MS / 2, GAP_MS + 1 
+ GAP_MS / 2)), new Change<>(1L, null),
+                        GAP_MS + 1 + GAP_MS / 2)
+                    ),
+                    results
+            );
+        }
     }
 
     @Test
@@ -314,6 +391,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     @Test
     public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() {
+        if (emitFinal)
+            return;
+
         initStore(false);
         processor.init(context);
 
@@ -342,6 +422,9 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     @Test
     public void shouldImmediatelyForwardRemovedSessionsWhenMerging() {
+        if (emitFinal)
+            return;
+
         initStore(false);
         processor.init(context);
 
@@ -399,6 +482,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
         final Processor<String, String, Windowed<String>, Change<Long>> 
processor = new KStreamSessionWindowAggregate<>(
             SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), 
ofMillis(0L)),
             STORE_NAME,
+            EmitStrategy.onWindowUpdate(),
             initializer,
             aggregator,
             sessionMerger
@@ -464,6 +548,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
         final Processor<String, String, Windowed<String>, Change<Long>> 
processor = new KStreamSessionWindowAggregate<>(
             SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), 
ofMillis(1L)),
             STORE_NAME,
+            EmitStrategy.onWindowUpdate(),
             initializer,
             aggregator,
             sessionMerger
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 484ad1f059..8af320ae70 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -99,7 +99,7 @@ public class KStreamWindowAggregateTest {
     @Parameter(1)
     public boolean withCache;
 
-    public EmitStrategy emitStrategy;
+    private EmitStrategy emitStrategy;
 
     private boolean emitFinal;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
deleted file mode 100644
index 60b37bb052..0000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-
-public class SessionTupleForwarderTest {
-
-    @Test
-    public void shouldSetFlushListenerOnWrappedStateStore() {
-        setFlushListener(true);
-        setFlushListener(false);
-    }
-
-    private void setFlushListener(final boolean sendOldValues) {
-        final WrappedStateStore<StateStore, Windowed<Object>, Object> store = 
mock(WrappedStateStore.class);
-        final SessionCacheFlushListener<Object, Object> flushListener = 
mock(SessionCacheFlushListener.class);
-
-        expect(store.setFlushListener(flushListener, 
sendOldValues)).andReturn(false);
-        replay(store);
-
-        new SessionTupleForwarder<>(store, null, flushListener, sendOldValues);
-
-        verify(store);
-    }
-
-    @Test
-    public void shouldForwardRecordsIfWrappedStateStoreDoesNotCache() {
-        shouldForwardRecordsIfWrappedStateStoreDoesNotCache(false);
-        shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true);
-    }
-
-    private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(final 
boolean sendOldValued) {
-        final WrappedStateStore<StateStore, String, String> store = 
mock(WrappedStateStore.class);
-        final ProcessorContext<Windowed<String>, Change<String>> context = 
mock(
-            ProcessorContext.class);
-
-        expect(store.setFlushListener(null, sendOldValued)).andReturn(false);
-        if (sendOldValued) {
-            context.forward(
-                new Record<>(
-                    new Windowed<>("key", new SessionWindow(21L, 42L)),
-                    new Change<>("value", "oldValue"),
-                    42L));
-        } else {
-            context.forward(
-                new Record<>(
-                    new Windowed<>("key", new SessionWindow(21L, 42L)),
-                    new Change<>("value", null),
-                    42L));
-        }
-        expectLastCall();
-        replay(store, context);
-
-        new SessionTupleForwarder<>(store, context, null, sendOldValued)
-            .maybeForward(
-                new Record<>(
-                    new Windowed<>("key", new SessionWindow(21L, 42L)),
-                    new Change<>("value", "oldValue"),
-                    42L));
-
-        verify(store, context);
-    }
-
-    @Test
-    public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
-        final WrappedStateStore<StateStore, String, String> store = 
mock(WrappedStateStore.class);
-        final ProcessorContext<Windowed<String>, Change<String>> context = 
mock(ProcessorContext.class);
-
-        expect(store.setFlushListener(null, false)).andReturn(true);
-        replay(store, context);
-
-        new SessionTupleForwarder<>(store, context, null, false)
-            .maybeForward(
-                new Record<>(
-                    new Windowed<>("key", new SessionWindow(21L, 42L)),
-                    new Change<>("value", "oldValue"),
-                    42L));
-
-        verify(store, context);
-    }
-
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index a77dcdb0a2..41876581b3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -21,10 +21,12 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -34,8 +36,11 @@ import 
org.apache.kafka.streams.kstream.SessionWindowedKStream;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore;
+import org.apache.kafka.streams.state.internals.MeteredSessionStore;
+import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedSessionStore;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.apache.kafka.test.MockAggregator;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockInitializer;
@@ -43,29 +48,58 @@ import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 
+@RunWith(Parameterized.class)
 public class SessionWindowedKStreamImplTest {
     private static final String TOPIC = "input";
     private final StreamsBuilder builder = new StreamsBuilder();
     private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final Merger<String, String> sessionMerger = (aggKey, aggOne, 
aggTwo) -> aggOne + "+" + aggTwo;
+
     private SessionWindowedKStream<String, String> stream;
 
+    @Parameterized.Parameter
+    public EmitStrategy.StrategyType type;
+
+    private boolean emitFinal;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> data() {
+        return asList(new Object[][] {
+            {EmitStrategy.StrategyType.ON_WINDOW_UPDATE},
+            {EmitStrategy.StrategyType.ON_WINDOW_CLOSE}
+        });
+    }
+
     @Before
     public void before() {
+        final EmitStrategy emitStrategy = 
EmitStrategy.StrategyType.forType(type);
+        emitFinal = type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
+
+        // Set interval to 0 so that it always tries to emit
+        
props.setProperty(StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION,
 "0");
+
         final KStream<String, String> stream = builder.stream(TOPIC, 
Consumed.with(Serdes.String(), Serdes.String()));
         this.stream = stream.groupByKey(Grouped.with(Serdes.String(), 
Serdes.String()))
-                
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)));
+            
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)))
+            .emitStrategy(emitStrategy);
     }
 
     @Test
@@ -89,19 +123,30 @@ public class SessionWindowedKStreamImplTest {
             processData(driver);
         }
 
-        final Map<Windowed<String>, ValueAndTimestamp<Long>> result =
-            supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
-
-        assertThat(result.size(), equalTo(3));
-        assertThat(
-            result.get(new Windowed<>("1", new SessionWindow(10L, 15L))),
-            equalTo(ValueAndTimestamp.make(2L, 15L)));
-        assertThat(
-            result.get(new Windowed<>("2", new SessionWindow(599L, 600L))),
-            equalTo(ValueAndTimestamp.make(2L, 600L)));
-        assertThat(
-            result.get(new Windowed<>("1", new SessionWindow(600L, 600L))),
-            equalTo(ValueAndTimestamp.make(1L, 600L)));
+        final ArrayList<KeyValueTimestamp<Windowed<String>, Long>> processed =
+            supplier.theCapturedProcessor().processed();
+
+        if (emitFinal) {
+            assertEquals(
+                Collections.singletonList(
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(10L, 15L)), 2L, 15L)
+                ),
+                processed
+            );
+        } else {
+            assertEquals(
+                asList(
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(10L, 10L)), 1L, 10L),
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(10L, 10L)), null, 10L),
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(10L, 15L)), 2L, 15L),
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(600L, 600L)), 1L, 600L),
+                    new KeyValueTimestamp<>(new Windowed<>("2", new 
SessionWindow(600L, 600L)), 1L, 600L),
+                    new KeyValueTimestamp<>(new Windowed<>("2", new 
SessionWindow(600L, 600L)), null, 600L),
+                    new KeyValueTimestamp<>(new Windowed<>("2", new 
SessionWindow(599L, 600L)), 2L, 600L)
+                ),
+                processed
+            );
+        }
     }
 
     @Test
@@ -115,19 +160,30 @@ public class SessionWindowedKStreamImplTest {
             processData(driver);
         }
 
-        final Map<Windowed<String>, ValueAndTimestamp<String>> result =
-            supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
-
-        assertThat(result.size(), equalTo(3));
-        assertThat(
-            result.get(new Windowed<>("1", new SessionWindow(10, 15))),
-            equalTo(ValueAndTimestamp.make("1+2", 15L)));
-        assertThat(
-            result.get(new Windowed<>("2", new SessionWindow(599L, 600))),
-            equalTo(ValueAndTimestamp.make("1+2", 600L)));
-        assertThat(
-            result.get(new Windowed<>("1", new SessionWindow(600, 600))),
-            equalTo(ValueAndTimestamp.make("3", 600L)));
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> processed 
=
+                supplier.theCapturedProcessor().processed();
+
+        if (emitFinal) {
+            assertEquals(
+                Collections.singletonList(
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(10L, 15L)), "1+2", 15L)
+                ),
+                processed
+            );
+        } else {
+            assertEquals(
+                asList(
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(10L, 10L)), "1", 10L),
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(10L, 10L)), null, 10L),
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(10L, 15L)), "1+2", 15L),
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(600L, 600L)), "3", 600L),
+                    new KeyValueTimestamp<>(new Windowed<>("2", new 
SessionWindow(600L, 600L)), "1", 600L),
+                    new KeyValueTimestamp<>(new Windowed<>("2", new 
SessionWindow(600L, 600L)), null, 600L),
+                    new KeyValueTimestamp<>(new Windowed<>("2", new 
SessionWindow(599L, 600L)), "1+2", 600L)
+                ),
+                processed
+            );
+        }
     }
 
     @Test
@@ -143,19 +199,30 @@ public class SessionWindowedKStreamImplTest {
             processData(driver);
         }
 
-        final Map<Windowed<String>, ValueAndTimestamp<String>> result =
-            supplier.theCapturedProcessor().lastValueAndTimestampPerKey();
-
-        assertThat(result.size(), equalTo(3));
-        assertThat(
-            result.get(new Windowed<>("1", new SessionWindow(10, 15))),
-            equalTo(ValueAndTimestamp.make("0+0+1+2", 15L)));
-        assertThat(
-            result.get(new Windowed<>("2", new SessionWindow(599, 600))),
-            equalTo(ValueAndTimestamp.make("0+0+1+2", 600L)));
-        assertThat(
-            result.get(new Windowed<>("1", new SessionWindow(600, 600))),
-            equalTo(ValueAndTimestamp.make("0+3", 600L)));
+        final ArrayList<KeyValueTimestamp<Windowed<String>, String>> processed 
=
+                supplier.theCapturedProcessor().processed();
+
+        if (emitFinal) {
+            assertEquals(
+                Collections.singletonList(
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(10L, 15L)), "0+0+1+2", 15L)
+                ),
+                processed
+            );
+        } else {
+            assertEquals(
+                asList(
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(10L, 10L)), "0+1", 10L),
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(10L, 10L)), null, 10L),
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(10L, 15L)), "0+0+1+2", 15L),
+                    new KeyValueTimestamp<>(new Windowed<>("1", new 
SessionWindow(600L, 600L)), "0+3", 600L),
+                    new KeyValueTimestamp<>(new Windowed<>("2", new 
SessionWindow(600L, 600L)), "0+1", 600L),
+                    new KeyValueTimestamp<>(new Windowed<>("2", new 
SessionWindow(600L, 600L)), null, 600L),
+                    new KeyValueTimestamp<>(new Windowed<>("2", new 
SessionWindow(599L, 600L)), "0+0+1+2", 600L)
+                ),
+                processed
+            );
+        }
     }
 
     @Test
@@ -292,6 +359,26 @@ public class SessionWindowedKStreamImplTest {
         assertThrows(NullPointerException.class, () -> 
stream.count((Materialized<String, Long, SessionStore<Bytes, byte[]>>) null));
     }
 
+    @Test
+    public void shouldNotEnableCachingWithEmitFinal() {
+        if (!emitFinal)
+            return;
+
+        stream.aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.TOSTRING_ADDER,
+                sessionMerger,
+                Materialized.<String, String, SessionStore<Bytes, 
byte[]>>as("aggregated").withValueSerde(Serdes.String()));
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final SessionStore<String, String> store = 
driver.getSessionStore("aggregated");
+            final WrappedStateStore changeLogging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+            assertThat(store, instanceOf(MeteredSessionStore.class));
+            assertThat(changeLogging, 
instanceOf(ChangeLoggingSessionBytesStore.class));
+            assertThat(changeLogging.wrapped(), 
instanceOf(RocksDBTimeOrderedSessionStore.class));
+        }
+    }
+
     private void processData(final TopologyTestDriver driver) {
         final TestInputTopic<String, String> inputTopic =
                 driver.createInputTopic(TOPIC, new StringSerializer(), new 
StringSerializer());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index 82017317c8..5ac43ac808 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -85,7 +85,7 @@ public class TimeWindowedKStreamImplTest {
     private boolean emitFinal;
 
     @Parameterized.Parameters(name = "{0}_cache:{1}")
-    public static Collection<Object[]> getKeySchema() {
+    public static Collection<Object[]> data() {
         return asList(new Object[][] {
             {StrategyType.ON_WINDOW_UPDATE, true},
             {StrategyType.ON_WINDOW_UPDATE, false},
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index f8a7073dab..1de78a8b85 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import 
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
@@ -86,6 +87,7 @@ public class GraphGraceSearchUtilTest {
                 new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
                     windows,
                     "asdf",
+                    EmitStrategy.onWindowUpdate(),
                     null,
                     null
                 ),
@@ -108,6 +110,7 @@ public class GraphGraceSearchUtilTest {
                 new KStreamSessionWindowAggregate<String, Long, Integer>(
                     windows,
                     "asdf",
+                    EmitStrategy.onWindowUpdate(),
                     null,
                     null,
                     null
@@ -127,7 +130,7 @@ public class GraphGraceSearchUtilTest {
         final StatefulProcessorNode<String, Long> graceGrandparent = new 
StatefulProcessorNode<>(
             "asdf",
             new ProcessorParameters<>(new 
KStreamSessionWindowAggregate<String, Long, Integer>(
-                windows, "asdf", null, null, null
+                windows, "asdf", EmitStrategy.onWindowUpdate(), null, null, 
null
             ), "asdf"),
             (StoreBuilder<?>) null
         );
@@ -167,6 +170,7 @@ public class GraphGraceSearchUtilTest {
                 new KStreamSessionWindowAggregate<String, Long, Integer>(
                     windows,
                     "asdf",
+                    EmitStrategy.onWindowUpdate(),
                     null,
                     null,
                     null
@@ -194,6 +198,7 @@ public class GraphGraceSearchUtilTest {
                 new KStreamSessionWindowAggregate<String, Long, Integer>(
                     SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), 
ofMillis(1234L)),
                     "asdf",
+                    EmitStrategy.onWindowUpdate(),
                     null,
                     null,
                     null
@@ -209,6 +214,7 @@ public class GraphGraceSearchUtilTest {
                 new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
                     TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)),
                     "asdf",
+                    EmitStrategy.onWindowUpdate(),
                     null,
                     null
                 ),

Reply via email to