[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626419#comment-16626419
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---------------------------------------

guozhangwang closed pull request #5567: KAFKA-7223: Suppress API with only 
immediate emit
URL: https://github.com/apache/kafka/pull/5567
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index bdd6dc3b37a..293bc6b7a86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -389,6 +389,16 @@
      */
     <KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? 
extends KR> mapper);
 
+    /**
+     * Suppress some updates from this changelog stream, determined by the 
supplied {@link Suppressed} configuration.
+     *
+     * This controls what updates downstream table and stream operations will 
receive.
+     *
+     * @param suppressed Configuration object determining what, if any, 
updates to suppress
+     * @return A new KTable with the desired suppression characteristics.
+     */
+    KTable<K, V> suppress(final Suppressed<K> suppressed);
+
     /**
      * Create a new {@code KTable} by transforming the value of each record in 
this {@code KTable} into a new value
      * (with possibly a new type), with default serializers, deserializers, 
and state store.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
new file mode 100644
index 00000000000..7488ef6ff37
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import 
org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl;
+import 
org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
+import 
org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl;
+import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
+
+import java.time.Duration;
+
+public interface Suppressed<K> {
+
+    /**
+     * Marker interface for a buffer configuration that is "strict" in the 
sense that it will strictly
+     * enforce the time bound and never emit early.
+     */
+    interface StrictBufferConfig extends BufferConfig<StrictBufferConfig> {
+
+    }
+
+    interface BufferConfig<BC extends BufferConfig<BC>> {
+        /**
+         * Create a size-constrained buffer in terms of the maximum number of 
keys it will store.
+         */
+        static BufferConfig<?> maxRecords(final long recordLimit) {
+            return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE);
+        }
+
+        /**
+         * Set a size constraint on the buffer in terms of the maximum number 
of keys it will store.
+         */
+        BC withMaxRecords(final long recordLimit);
+
+        /**
+         * Create a size-constrained buffer in terms of the maximum number of 
bytes it will use.
+         */
+        static BufferConfig<?> maxBytes(final long byteLimit) {
+            return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit);
+        }
+
+        /**
+         * Set a size constraint on the buffer, the maximum number of bytes it 
will use.
+         */
+        BC withMaxBytes(final long byteLimit);
+
+        /**
+         * Create a buffer unconstrained by size (either keys or bytes).
+         *
+         * As a result, the buffer will consume as much memory as it needs, 
dictated by the time bound.
+         *
+         * If there isn't enough heap available to meet the demand, the 
application will encounter an
+         * {@link OutOfMemoryError} and shut down (not guaranteed to be a 
graceful exit). Also, note that
+         * JVM processes under extreme memory pressure may exhibit poor GC 
behavior.
+         *
+         * This is a convenient option if you doubt that your buffer will be 
that large, but also don't
+         * wish to pick particular constraints, such as in testing.
+         *
+         * This buffer is "strict" in the sense that it will enforce the time 
bound or crash.
+         * It will never emit early.
+         */
+        static StrictBufferConfig unbounded() {
+            return new StrictBufferConfigImpl();
+        }
+
+        /**
+         * Set the buffer to be unconstrained by size (either keys or bytes).
+         *
+         * As a result, the buffer will consume as much memory as it needs, 
dictated by the time bound.
+         *
+         * If there isn't enough heap available to meet the demand, the 
application will encounter an
+         * {@link OutOfMemoryError} and shut down (not guaranteed to be a 
graceful exit). Also, note that
+         * JVM processes under extreme memory pressure may exhibit poor GC 
behavior.
+         *
+         * This is a convenient option if you doubt that your buffer will be 
that large, but also don't
+         * wish to pick particular constraints, such as in testing.
+         *
+         * This buffer is "strict" in the sense that it will enforce the time 
bound or crash.
+         * It will never emit early.
+         */
+        StrictBufferConfig withNoBound();
+
+        /**
+         * Set the buffer to gracefully shut down the application when any of 
its constraints are violated
+         *
+         * This buffer is "strict" in the sense that it will enforce the time 
bound or shut down.
+         * It will never emit early.
+         */
+        StrictBufferConfig shutDownWhenFull();
+
+        /**
+         * Sets the buffer to use on-disk storage if it requires more memory 
than the constraints allow.
+         *
+         * This buffer is "strict" in the sense that it will never emit early.
+         */
+        StrictBufferConfig spillToDiskWhenFull();
+
+        /**
+         * Set the buffer to just emit the oldest records when any of its 
constraints are violated.
+         *
+         * This buffer is "not strict" in the sense that it may emit early, so 
it is suitable for reducing
+         * duplicate results downstream, but does not promise to eliminate 
them.
+         */
+        BufferConfig emitEarlyWhenFull();
+    }
+
+    /**
+     * Configure the suppression to emit only the "final results" from the 
window.
+     *
+     * By default all Streams operators emit results whenever new results are 
available.
+     * This includes windowed operations.
+     *
+     * This configuration will instead emit just one result per key for each 
window, guaranteeing
+     * to deliver only the final result. This option is suitable for use cases 
in which the business logic
+     * requires a hard guarantee that only the final result is propagated. For 
example, sending alerts.
+     *
+     * To accomplish this, the operator will buffer events from the window 
until the window close (that is,
+     * until the end-time passes, and additionally until the grace period 
expires). Since windowed operators
+     * are required to reject late events for a window whose grace period is 
expired, there is an additional
+     * guarantee that the final results emitted from this suppression will 
match any queriable state upstream.
+     *
+     * @param bufferConfig A configuration specifying how much space to use 
for buffering intermediate results.
+     *                     This is required to be a "strict" config, since it 
would violate the "final results"
+     *                     property to emit early and then issue an update 
later.
+     * @param <K> The key type for the KTable to apply this suppression to. 
"Final results" mode is only available
+     *           on Windowed KTables (this is enforced by the type parameter).
+     * @return a "final results" mode suppression configuration
+     */
+    static <K extends Windowed> Suppressed<K> untilWindowCloses(final 
StrictBufferConfig bufferConfig) {
+        return new FinalResultsSuppressionBuilder<>(bufferConfig);
+    }
+
+    /**
+     * Configure the suppression to wait {@code timeToWaitForMoreEvents} 
amount of time after receiving a record
+     * before emitting it further downstream. If another record for the same 
key arrives in the mean time, it replaces
+     * the first record in the buffer but does <em>not</em> re-start the timer.
+     *
+     * @param timeToWaitForMoreEvents The amount of time to wait, per record, 
for new events.
+     * @param bufferConfig A configuration specifying how much space to use 
for buffering intermediate results.
+     * @param <K> The key type for the KTable to apply this suppression to.
+     * @return a suppression configuration
+     */
+    static <K> Suppressed<K> untilTimeLimit(final Duration 
timeToWaitForMoreEvents, final BufferConfig bufferConfig) {
+        return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, 
null);
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 5a3c897f781..b89399bf7b7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -38,7 +38,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
-class KStreamSessionWindowAggregate<K, V, Agg> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
+public class KStreamSessionWindowAggregate<K, V, Agg> implements 
KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KStreamSessionWindowAggregate.class);
 
     private final String storeName;
@@ -49,11 +49,11 @@
 
     private boolean sendOldValues = false;
 
-    KStreamSessionWindowAggregate(final SessionWindows windows,
-                                  final String storeName,
-                                  final Initializer<Agg> initializer,
-                                  final Aggregator<? super K, ? super V, Agg> 
aggregator,
-                                  final Merger<? super K, Agg> sessionMerger) {
+    public KStreamSessionWindowAggregate(final SessionWindows windows,
+                                         final String storeName,
+                                         final Initializer<Agg> initializer,
+                                         final Aggregator<? super K, ? super 
V, Agg> aggregator,
+                                         final Merger<? super K, Agg> 
sessionMerger) {
         this.windows = windows;
         this.storeName = storeName;
         this.initializer = initializer;
@@ -66,6 +66,10 @@
         return new KStreamSessionWindowAggregateProcessor();
     }
 
+    public SessionWindows windows() {
+        return windows;
+    }
+
     @Override
     public void enableSendingOldValues() {
         sendOldValues = true;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 57542847b96..f29251573e2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -44,10 +44,10 @@
 
     private boolean sendOldValues = false;
 
-    KStreamWindowAggregate(final Windows<W> windows,
-                           final String storeName,
-                           final Initializer<Agg> initializer,
-                           final Aggregator<? super K, ? super V, Agg> 
aggregator) {
+    public KStreamWindowAggregate(final Windows<W> windows,
+                                  final String storeName,
+                                  final Initializer<Agg> initializer,
+                                  final Aggregator<? super K, ? super V, Agg> 
aggregator) {
         this.windows = windows;
         this.storeName = storeName;
         this.initializer = initializer;
@@ -59,6 +59,10 @@
         return new KStreamWindowAggregateProcessor();
     }
 
+    public Windows<W> windows() {
+        return windows;
+    }
+
     @Override
     public void enableSendingOldValues() {
         sendOldValues = true;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 352e42d3918..2330fad1b16 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -26,21 +26,30 @@
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
+import 
org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
+import 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
+import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 
+import java.time.Duration;
+import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 
+import static 
org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil.findAndVerifyWindowGrace;
+
 /**
  * The implementation class of {@link KTable}.
  *
@@ -66,6 +75,8 @@
 
     private static final String SELECT_NAME = "KTABLE-SELECT-";
 
+    private static final String SUPPRESS_NAME = "KTABLE-SUPPRESS-";
+
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
     private static final String TRANSFORMVALUES_NAME = 
"KTABLE-TRANSFORMVALUES-";
@@ -349,6 +360,53 @@ public String queryableStoreName() {
         return toStream().selectKey(mapper);
     }
 
+    @Override
+    public KTable<K, V> suppress(final Suppressed<K> suppressed) {
+        final String name = builder.newProcessorName(SUPPRESS_NAME);
+
+        final ProcessorSupplier<K, Change<V>> suppressionSupplier =
+            () -> new KTableSuppressProcessor<>(buildSuppress(suppressed));
+
+        final ProcessorParameters<K, Change<V>> processorParameters = new 
ProcessorParameters<>(
+            suppressionSupplier,
+            name
+        );
+
+        final ProcessorGraphNode<K, Change<V>> node = new 
ProcessorGraphNode<>(name, processorParameters, false);
+
+        builder.addGraphNode(streamsGraphNode, node);
+
+        return new KTableImpl<K, S, V>(
+            builder,
+            name,
+            suppressionSupplier,
+            keySerde,
+            valSerde,
+            Collections.singleton(this.name),
+            null,
+            false,
+            node
+        );
+    }
+
+    @SuppressWarnings("unchecked")
+    private SuppressedImpl<K> buildSuppress(final Suppressed<K> suppress) {
+        if (suppress instanceof FinalResultsSuppressionBuilder) {
+            final long grace = findAndVerifyWindowGrace(streamsGraphNode);
+
+            final FinalResultsSuppressionBuilder<?> builder = 
(FinalResultsSuppressionBuilder) suppress;
+
+            final SuppressedImpl<? extends Windowed> finalResultsSuppression =
+                builder.buildFinalResultsSuppression(Duration.ofMillis(grace));
+
+            return (SuppressedImpl<K>) finalResultsSuppression;
+        } else if (suppress instanceof SuppressedImpl) {
+            return (SuppressedImpl<K>) suppress;
+        } else {
+            throw new IllegalArgumentException("Custom subclasses of 
Suppressed are not allowed.");
+        }
+    }
+
     @Override
     public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
                                      final ValueJoiner<? super V, ? super V1, 
? extends R> joiner) {
@@ -492,12 +550,12 @@ public String queryableStoreName() {
         final ProcessorParameters joinMergeProcessorParameters = new 
ProcessorParameters(joinMerge, joinMergeName);
 
         
kTableJoinNodeBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParameters)
-            .withJoinOtherProcessorParameters(joinOtherProcessorParameters)
-            .withJoinThisProcessorParameters(joinThisProcessorParameters)
-            .withJoinThisStoreNames(valueGetterSupplier().storeNames())
-            .withJoinOtherStoreNames(((KTableImpl) 
other).valueGetterSupplier().storeNames())
-            .withOtherJoinSideNodeName(((KTableImpl) other).name)
-            .withThisJoinSideNodeName(name);
+                             
.withJoinOtherProcessorParameters(joinOtherProcessorParameters)
+                             
.withJoinThisProcessorParameters(joinThisProcessorParameters)
+                             
.withJoinThisStoreNames(valueGetterSupplier().storeNames())
+                             .withJoinOtherStoreNames(((KTableImpl) 
other).valueGetterSupplier().storeNames())
+                             .withOtherJoinSideNodeName(((KTableImpl) 
other).name)
+                             .withThisJoinSideNodeName(name);
 
         final KTableKTableJoinNode kTableKTableJoinNode = 
kTableJoinNodeBuilder.build();
         builder.addGraphNode(this.streamsGraphNode, kTableKTableJoinNode);
@@ -526,10 +584,10 @@ public String queryableStoreName() {
         final String selectName = builder.newProcessorName(SELECT_NAME);
 
         final KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = 
new KTableRepartitionMap<>(this, selector);
-        final ProcessorParameters processorParameters = new 
ProcessorParameters<>(selectSupplier, selectName);
+        final ProcessorParameters<K, Change<V>> processorParameters = new 
ProcessorParameters<>(selectSupplier, selectName);
 
         // select the aggregate key and values (old and new), it would require 
parent to send old values
-        final ProcessorGraphNode<K1, V1> groupByMapNode = new 
ProcessorGraphNode<>(
+        final ProcessorGraphNode<K, Change<V>> groupByMapNode = new 
ProcessorGraphNode<>(
             selectName,
             processorParameters,
             false
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
new file mode 100644
index 00000000000..306ddf5cf5e
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.Windows;
+import 
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
+import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+public final class GraphGraceSearchUtil {
+    private GraphGraceSearchUtil() {}
+
+    public static long findAndVerifyWindowGrace(final StreamsGraphNode 
streamsGraphNode) {
+        return findAndVerifyWindowGrace(streamsGraphNode, "");
+    }
+
+    private static long findAndVerifyWindowGrace(final StreamsGraphNode 
streamsGraphNode, final String chain) {
+        // error base case: we traversed off the end of the graph without 
finding a window definition
+        if (streamsGraphNode == null) {
+            throw new TopologyException(
+                "Window close time is only defined for windowed computations. 
Got [" + chain + "]."
+            );
+        }
+        // base case: return if this node defines a grace period.
+        {
+            final Long gracePeriod = extractGracePeriod(streamsGraphNode);
+            if (gracePeriod != null) {
+                return gracePeriod;
+            }
+        }
+
+        final String newChain = chain.equals("") ? streamsGraphNode.nodeName() 
: streamsGraphNode.nodeName() + "->" + chain;
+
+        if (streamsGraphNode.parentNodes().isEmpty()) {
+            // error base case: we traversed to the end of the graph without 
finding a window definition
+            throw new TopologyException(
+                "Window close time is only defined for windowed computations. 
Got [" + newChain + "]."
+            );
+        }
+
+        // recursive case: all parents must define a grace period, and we use 
the max of our parents' graces.
+        long inheritedGrace = -1;
+        for (final StreamsGraphNode parentNode : 
streamsGraphNode.parentNodes()) {
+            final long parentGrace = findAndVerifyWindowGrace(parentNode, 
newChain);
+            inheritedGrace = Math.max(inheritedGrace, parentGrace);
+        }
+
+        if (inheritedGrace == -1) {
+            throw new IllegalStateException(); // shouldn't happen, and it's 
not a legal grace period
+        }
+
+        return inheritedGrace;
+    }
+
+    private static Long extractGracePeriod(final StreamsGraphNode node) {
+        if (node instanceof StatefulProcessorNode) {
+            final ProcessorSupplier processorSupplier = 
((StatefulProcessorNode) node).processorParameters().processorSupplier();
+            if (processorSupplier instanceof KStreamWindowAggregate) {
+                final KStreamWindowAggregate kStreamWindowAggregate = 
(KStreamWindowAggregate) processorSupplier;
+                final Windows windows = kStreamWindowAggregate.windows();
+                return windows.gracePeriodMs();
+            } else if (processorSupplier instanceof 
KStreamSessionWindowAggregate) {
+                final KStreamSessionWindowAggregate 
kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) 
processorSupplier;
+                final SessionWindows windows = 
kStreamSessionWindowAggregate.windows();
+                return windows.gracePeriodMs();
+            } else {
+                return null;
+            }
+        } else {
+            return null;
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java
new file mode 100644
index 00000000000..e731dc6f5e1
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.Suppressed;
+
+import static 
org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
+
+abstract class BufferConfigImpl<BC extends Suppressed.BufferConfig<BC>> 
implements Suppressed.BufferConfig<BC> {
+    public abstract long maxKeys();
+
+    public abstract long maxBytes();
+
+    @SuppressWarnings("unused")
+    public abstract BufferFullStrategy bufferFullStrategy();
+
+    @Override
+    public Suppressed.StrictBufferConfig withNoBound() {
+        return new StrictBufferConfigImpl(
+            Long.MAX_VALUE,
+            Long.MAX_VALUE,
+            SHUT_DOWN // doesn't matter, given the bounds
+        );
+    }
+
+    @Override
+    public Suppressed.StrictBufferConfig shutDownWhenFull() {
+        return new StrictBufferConfigImpl(maxKeys(), maxBytes(), SHUT_DOWN);
+    }
+
+    @Override
+    public Suppressed.BufferConfig emitEarlyWhenFull() {
+        return new EagerBufferConfigImpl(maxKeys(), maxBytes());
+    }
+
+    @Override
+    public Suppressed.StrictBufferConfig spillToDiskWhenFull() {
+        throw new UnsupportedOperationException("not implemented");
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferFullStrategy.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferFullStrategy.java
new file mode 100644
index 00000000000..2da7c141825
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferFullStrategy.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+public enum BufferFullStrategy {
+    EMIT,
+    SPILL_TO_DISK,
+    SHUT_DOWN
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
new file mode 100644
index 00000000000..0c2c883e18a
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.Suppressed;
+
+import java.util.Objects;
+
+public class EagerBufferConfigImpl extends BufferConfigImpl {
+
+    private final long maxKeys;
+    private final long maxBytes;
+
+    public EagerBufferConfigImpl(final long maxKeys, final long maxBytes) {
+        this.maxKeys = maxKeys;
+        this.maxBytes = maxBytes;
+    }
+
+    @Override
+    public Suppressed.BufferConfig withMaxRecords(final long recordLimit) {
+        return new EagerBufferConfigImpl(recordLimit, maxBytes);
+    }
+
+    @Override
+    public Suppressed.BufferConfig withMaxBytes(final long byteLimit) {
+        return new EagerBufferConfigImpl(maxKeys, byteLimit);
+    }
+
+    @Override
+    public long maxKeys() {
+        return maxKeys;
+    }
+
+    @Override
+    public long maxBytes() {
+        return maxBytes;
+    }
+
+    @Override
+    public BufferFullStrategy bufferFullStrategy() {
+        return BufferFullStrategy.EMIT;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
+        return maxKeys == that.maxKeys &&
+            maxBytes == that.maxBytes;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(maxKeys, maxBytes);
+    }
+
+    @Override
+    public String toString() {
+        return "EagerBufferConfigImpl{maxKeys=" + maxKeys + ", maxBytes=" + 
maxBytes + '}';
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
new file mode 100644
index 00000000000..548f5991dbb
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.Suppressed;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+import java.time.Duration;
+import java.util.Objects;
+
+public class FinalResultsSuppressionBuilder<K extends Windowed> implements 
Suppressed<K> {
+    private final StrictBufferConfig bufferConfig;
+
+    public FinalResultsSuppressionBuilder(final Suppressed.StrictBufferConfig 
bufferConfig) {
+        this.bufferConfig = bufferConfig;
+    }
+
+    public SuppressedImpl<K> buildFinalResultsSuppression(final Duration 
gracePeriod) {
+        return new SuppressedImpl<>(
+            gracePeriod,
+            bufferConfig,
+            (ProcessorContext context, K key) -> key.window().end()
+        );
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final FinalResultsSuppressionBuilder<?> that = 
(FinalResultsSuppressionBuilder<?>) o;
+        return Objects.equals(bufferConfig, that.bufferConfig);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bufferConfig);
+    }
+
+    @Override
+    public String toString() {
+        return "FinalResultsSuppressionBuilder{bufferConfig=" + bufferConfig + 
'}';
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
new file mode 100644
index 00000000000..f65f2b4af20
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+
+import java.time.Duration;
+
+public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
+    private final SuppressedImpl<K> suppress;
+    private InternalProcessorContext internalProcessorContext;
+
+    public KTableSuppressProcessor(final SuppressedImpl<K> suppress) {
+        this.suppress = suppress;
+    }
+
+    @Override
+    public void init(final ProcessorContext context) {
+        internalProcessorContext = (InternalProcessorContext) context;
+    }
+
+    @Override
+    public void process(final K key, final Change<V> value) {
+        if (suppress.getTimeToWaitForMoreEvents() == Duration.ZERO && 
definedRecordTime(key) <= internalProcessorContext.streamTime()) {
+            internalProcessorContext.forward(key, value);
+        } else {
+            throw new NotImplementedException();
+        }
+    }
+
+    private long definedRecordTime(final K key) {
+        return suppress.getTimeDefinition().time(internalProcessorContext, 
key);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public String toString() {
+        return "KTableSuppressProcessor{suppress=" + suppress + '}';
+    }
+
+    static class NotImplementedException extends RuntimeException {
+        NotImplementedException() {
+            super();
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
new file mode 100644
index 00000000000..0634a748a5b
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.Suppressed;
+
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
+
+public class StrictBufferConfigImpl extends 
BufferConfigImpl<Suppressed.StrictBufferConfig> implements 
Suppressed.StrictBufferConfig {
+
+    private final long maxKeys;
+    private final long maxBytes;
+    private final BufferFullStrategy bufferFullStrategy;
+
+    public StrictBufferConfigImpl(final long maxKeys,
+                                  final long maxBytes,
+                                  final BufferFullStrategy bufferFullStrategy) 
{
+        this.maxKeys = maxKeys;
+        this.maxBytes = maxBytes;
+        this.bufferFullStrategy = bufferFullStrategy;
+    }
+
+    public StrictBufferConfigImpl() {
+        this.maxKeys = Long.MAX_VALUE;
+        this.maxBytes = Long.MAX_VALUE;
+        this.bufferFullStrategy = SHUT_DOWN;
+    }
+
+    @Override
+    public Suppressed.StrictBufferConfig withMaxRecords(final long 
recordLimit) {
+        return new StrictBufferConfigImpl(recordLimit, maxBytes, 
bufferFullStrategy);
+    }
+
+    @Override
+    public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) {
+        return new StrictBufferConfigImpl(maxKeys, byteLimit, 
bufferFullStrategy);
+    }
+
+    @Override
+    public long maxKeys() {
+        return maxKeys;
+    }
+
+    @Override
+    public long maxBytes() {
+        return maxBytes;
+    }
+
+    @Override
+    public BufferFullStrategy bufferFullStrategy() {
+        return bufferFullStrategy;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o;
+        return maxKeys == that.maxKeys &&
+            maxBytes == that.maxBytes &&
+            bufferFullStrategy == that.bufferFullStrategy;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(maxKeys, maxBytes, bufferFullStrategy);
+    }
+
+    @Override
+    public String toString() {
+        return "StrictBufferConfigImpl{maxKeys=" + maxKeys +
+            ", maxBytes=" + maxBytes +
+            ", bufferFullStrategy=" + bufferFullStrategy + '}';
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
new file mode 100644
index 00000000000..cffc42b66d5
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.Suppressed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+import java.time.Duration;
+import java.util.Objects;
+
+public class SuppressedImpl<K> implements Suppressed<K> {
+    private static final Duration DEFAULT_SUPPRESSION_TIME = 
Duration.ofMillis(Long.MAX_VALUE);
+    private static final StrictBufferConfig DEFAULT_BUFFER_CONFIG = 
BufferConfig.unbounded();
+
+    private final BufferConfig bufferConfig;
+    private final Duration timeToWaitForMoreEvents;
+    private final TimeDefinition<K> timeDefinition;
+
+    public SuppressedImpl(final Duration suppressionTime,
+                          final BufferConfig bufferConfig,
+                          final TimeDefinition<K> timeDefinition) {
+        this.timeToWaitForMoreEvents = suppressionTime == null ? 
DEFAULT_SUPPRESSION_TIME : suppressionTime;
+        this.timeDefinition = timeDefinition == null ? (context, anyKey) -> 
context.timestamp() : timeDefinition;
+        this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : 
bufferConfig;
+    }
+
+    interface TimeDefinition<K> {
+        long time(final ProcessorContext context, final K key);
+    }
+
+    TimeDefinition<K> getTimeDefinition() {
+        return timeDefinition;
+    }
+
+    Duration getTimeToWaitForMoreEvents() {
+        return timeToWaitForMoreEvents == null ? Duration.ZERO : 
timeToWaitForMoreEvents;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final SuppressedImpl<?> that = (SuppressedImpl<?>) o;
+        return Objects.equals(bufferConfig, that.bufferConfig) &&
+            Objects.equals(getTimeToWaitForMoreEvents(), 
that.getTimeToWaitForMoreEvents()) &&
+            Objects.equals(getTimeDefinition(), that.getTimeDefinition());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bufferConfig, getTimeToWaitForMoreEvents(), 
getTimeDefinition());
+    }
+
+    @Override
+    public String toString() {
+        return "SuppressedImpl{" +
+            ", bufferConfig=" + bufferConfig +
+            ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents +
+            ", timeDefinition=" + timeDefinition +
+            '}';
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java 
b/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java
new file mode 100644
index 00000000000..421311257b4
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+public class KeyValueTimestamp<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+
+    public KeyValueTimestamp(final K key, final V value, final long timestamp) 
{
+        this.key = key;
+        this.value = value;
+        this.timestamp = timestamp;
+    }
+
+    public K key() {
+        return key;
+    }
+
+    public V value() {
+        return value;
+    }
+
+    public long timestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public String toString() {
+        return "KeyValueTimestamp{key=" + key + ", value=" + value + ", 
timestamp=" + timestamp + '}';
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
new file mode 100644
index 00000000000..a0e78580d45
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
+import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
+
+@Category({IntegrationTest.class})
+public class SuppressionIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+    private static final StringDeserializer STRING_DESERIALIZER = new 
StringDeserializer();
+    private static final StringSerializer STRING_SERIALIZER = new 
StringSerializer();
+    private static final Serde<String> STRING_SERDE = Serdes.String();
+    private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+    private static final int COMMIT_INTERVAL = 100;
+    private static final int SCALE_FACTOR = COMMIT_INTERVAL * 2;
+
+    @Test
+    public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws 
InterruptedException {
+        final String testId = 
"-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
+        final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(input, outputSuppressed, outputRaw);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = builder
+            .table(
+                input,
+                Consumed.with(STRING_SERDE, STRING_SERDE),
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>with(STRING_SERDE, STRING_SERDE)
+                    .withCachingDisabled()
+                    .withLoggingDisabled()
+            )
+            .groupBy((k, v) -> new KeyValue<>(v, k), 
Serialized.with(STRING_SERDE, STRING_SERDE))
+            .count(Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("counts").withCachingDisabled());
+
+        valueCounts
+            .suppress(untilTimeLimit(Duration.ZERO, unbounded()))
+            .toStream()
+            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+
+        try {
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", "x", scaledTime(4L))
+                )
+            );
+            verifyOutput(
+                outputRaw,
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
+                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", 1L, scaledTime(4L))
+                )
+            );
+            verifyOutput(
+                outputSuppressed,
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
+                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", 1L, scaledTime(4L))
+                )
+            );
+        } finally {
+            driver.close();
+            cleanStateAfterTest(driver);
+        }
+    }
+
+    private void cleanStateBeforeTest(final String... topic) throws 
InterruptedException {
+        CLUSTER.deleteAllTopicsAndWait(30_000L);
+        for (final String s : topic) {
+            CLUSTER.createTopic(s, 1, 1);
+        }
+    }
+
+    private KafkaStreams getCleanStartedStreams(final String appId, final 
StreamsBuilder builder) {
+        final Properties streamsConfig = mkProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+            mkEntry(StreamsConfig.POLL_MS_CONFIG, 
Objects.toString(COMMIT_INTERVAL)),
+            mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
Objects.toString(COMMIT_INTERVAL))
+        ));
+        final KafkaStreams driver = new KafkaStreams(builder.build(), 
streamsConfig);
+        driver.cleanUp();
+        driver.start();
+        return driver;
+    }
+
+    private void cleanStateAfterTest(final KafkaStreams driver) throws 
InterruptedException {
+        driver.cleanUp();
+        CLUSTER.deleteAllTopicsAndWait(30_000L);
+    }
+
+    private long scaledTime(final long unscaledTime) {
+        return SCALE_FACTOR * unscaledTime;
+    }
+
+    private void produceSynchronously(final String topic, final 
List<KeyValueTimestamp<String, String>> toProduce) {
+        final Properties producerConfig = mkProperties(mkMap(
+            mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"),
+            mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
STRING_SERIALIZER.getClass().getName()),
+            mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
STRING_SERIALIZER.getClass().getName()),
+            mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers())
+        ));
+        try (final Producer<String, String> producer = new 
KafkaProducer<>(producerConfig)) {
+            // TODO: test EOS
+            //noinspection ConstantConditions
+            if (false) {
+                producer.initTransactions();
+                producer.beginTransaction();
+            }
+            final LinkedList<Future<RecordMetadata>> futures = new 
LinkedList<>();
+            for (final KeyValueTimestamp<String, String> record : toProduce) {
+                final Future<RecordMetadata> f = producer.send(
+                    new ProducerRecord<>(topic, null, record.timestamp(), 
record.key(), record.value(), null)
+                );
+                futures.add(f);
+            }
+            for (final Future<RecordMetadata> future : futures) {
+                try {
+                    future.get();
+                } catch (final InterruptedException | ExecutionException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            // TODO: test EOS
+            //noinspection ConstantConditions
+            if (false) {
+                producer.commitTransaction();
+            } else {
+                producer.flush();
+            }
+        }
+    }
+
+    private void verifyOutput(final String topic, final 
List<KeyValueTimestamp<String, Long>> expected) {
+        final List<ConsumerRecord<String, Long>> results;
+        try {
+            final Properties properties = mkProperties(
+                mkMap(
+                    mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
+                    mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+                    mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
+                    mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
+                )
+            );
+            results = 
IntegrationTestUtils.waitUntilMinRecordsReceived(properties, topic, 
expected.size());
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+
+        if (results.size() != expected.size()) {
+            throw new AssertionError(printRecords(results) + " != " + 
expected);
+        }
+        final Iterator<KeyValueTimestamp<String, Long>> expectedIterator = 
expected.iterator();
+        for (final ConsumerRecord<String, Long> result : results) {
+            final KeyValueTimestamp<String, Long> expected1 = 
expectedIterator.next();
+            try {
+                compareKeyValueTimestamp(result, expected1.key(), 
expected1.value(), expected1.timestamp());
+            } catch (final AssertionError e) {
+                throw new AssertionError(printRecords(results) + " != " + 
expected, e);
+            }
+        }
+    }
+
+    private <K, V> void compareKeyValueTimestamp(final ConsumerRecord<K, V> 
record, final K expectedKey, final V expectedValue, final long 
expectedTimestamp) {
+        Objects.requireNonNull(record);
+        final K recordKey = record.key();
+        final V recordValue = record.value();
+        final long recordTimestamp = record.timestamp();
+        final AssertionError error = new AssertionError("Expected <" + 
expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp +
+                                                            " but was <" + 
recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp);
+        if (recordKey != null) {
+            if (!recordKey.equals(expectedKey)) {
+                throw error;
+            }
+        } else if (expectedKey != null) {
+            throw error;
+        }
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+        if (recordTimestamp != expectedTimestamp) {
+            throw error;
+        }
+    }
+
+    private <K, V> String printRecords(final List<ConsumerRecord<K, V>> 
result) {
+        final StringBuilder resultStr = new StringBuilder();
+        resultStr.append("[\n");
+        for (final ConsumerRecord<?, ?> record : result) {
+            resultStr.append("  ").append(record.toString()).append("\n");
+        }
+        resultStr.append("]");
+        return resultStr.toString();
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
new file mode 100644
index 00000000000..53f24b58aac
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import 
org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl;
+import 
org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
+import 
org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl;
+import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
+import org.junit.Test;
+
+import static java.lang.Long.MAX_VALUE;
+import static java.time.Duration.ofMillis;
+import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
+import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
+import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
+import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+import static 
org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class SuppressedTest {
+
+    @Test
+    public void bufferBuilderShouldBeConsistent() {
+        assertThat(
+            "noBound should remove bounds",
+            maxBytes(2L).withMaxRecords(4L).withNoBound(),
+            is(unbounded())
+        );
+
+        assertThat(
+            "keys alone should be set",
+            maxRecords(2L),
+            is(new EagerBufferConfigImpl(2L, MAX_VALUE))
+        );
+
+        assertThat(
+            "size alone should be set",
+            maxBytes(2L),
+            is(new EagerBufferConfigImpl(MAX_VALUE, 2L))
+        );
+    }
+
+    @Test
+    public void intermediateEventsShouldAcceptAnyBufferAndSetBounds() {
+        assertThat(
+            "time alone should be set",
+            untilTimeLimit(ofMillis(2), unbounded()),
+            is(new SuppressedImpl<>(ofMillis(2), unbounded(), null))
+        );
+
+        assertThat(
+            "time and unbounded buffer should be set",
+            untilTimeLimit(ofMillis(2), unbounded()),
+            is(new SuppressedImpl<>(ofMillis(2), unbounded(), null))
+        );
+
+        assertThat(
+            "time and keys buffer should be set",
+            untilTimeLimit(ofMillis(2), maxRecords(2)),
+            is(new SuppressedImpl<>(ofMillis(2), maxRecords(2), null))
+        );
+
+        assertThat(
+            "time and size buffer should be set",
+            untilTimeLimit(ofMillis(2), maxBytes(2)),
+            is(new SuppressedImpl<>(ofMillis(2), maxBytes(2), null))
+        );
+
+        assertThat(
+            "all constraints should be set",
+            untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)),
+            is(new SuppressedImpl<>(ofMillis(2), new EagerBufferConfigImpl(3L, 
2L), null))
+        );
+    }
+
+    @Test
+    public void finalEventsShouldAcceptStrictBuffersAndSetBounds() {
+
+        assertThat(
+            untilWindowCloses(unbounded()),
+            is(new FinalResultsSuppressionBuilder<>(unbounded()))
+        );
+
+        assertThat(
+            untilWindowCloses(maxRecords(2L).shutDownWhenFull()),
+            is(new FinalResultsSuppressionBuilder<>(new 
StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN))
+            )
+        );
+
+        assertThat(
+            untilWindowCloses(maxBytes(2L).shutDownWhenFull()),
+            is(new FinalResultsSuppressionBuilder<>(new 
StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN))
+            )
+        );
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
new file mode 100644
index 00000000000..fead6788eb4
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+
+import static java.time.Duration.ZERO;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
+import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
+
+public class SuppressScenarioTest {
+    private static final StringDeserializer STRING_DESERIALIZER = new 
StringDeserializer();
+    private static final StringSerializer STRING_SERIALIZER = new 
StringSerializer();
+    private static final Serde<String> STRING_SERDE = Serdes.String();
+    private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+
+    @Test
+    public void shouldImmediatelyEmitEventsWithZeroEmitAfter() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTable<String, Long> valueCounts = builder
+            .table(
+                "input",
+                Consumed.with(STRING_SERDE, STRING_SERDE),
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>with(STRING_SERDE, STRING_SERDE)
+                    .withCachingDisabled()
+                    .withLoggingDisabled()
+            )
+            .groupBy((k, v) -> new KeyValue<>(v, k), 
Serialized.with(STRING_SERDE, STRING_SERDE))
+            .count();
+
+        valueCounts
+            .suppress(untilTimeLimit(ZERO, unbounded()))
+            .toStream()
+            .to("output-suppressed", Produced.with(STRING_SERDE, 
Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final Topology topology = builder.build();
+
+        final Properties config = Utils.mkProperties(Utils.mkMap(
+            Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, 
getClass().getSimpleName().toLowerCase(Locale.getDefault())),
+            Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus")
+        ));
+
+        final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, config)) {
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
+            driver.pipeInput(recordFactory.create("input", "k2", "v1", 2L));
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, 0L),
+                    new KeyValueTimestamp<>("v1", 0L, 1L),
+                    new KeyValueTimestamp<>("v2", 1L, 1L),
+                    new KeyValueTimestamp<>("v1", 1L, 2L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, 0L),
+                    new KeyValueTimestamp<>("v1", 0L, 1L),
+                    new KeyValueTimestamp<>("v2", 1L, 1L),
+                    new KeyValueTimestamp<>("v1", 1L, 2L)
+                )
+            );
+            driver.pipeInput(recordFactory.create("input", "x", "x", 3L));
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(
+                    new KeyValueTimestamp<>("x", 1L, 3L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(
+                    new KeyValueTimestamp<>("x", 1L, 3L)
+                )
+            );
+            driver.pipeInput(recordFactory.create("input", "x", "x", 4L));
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("x", 0L, 4L),
+                    new KeyValueTimestamp<>("x", 1L, 4L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("x", 0L, 4L),
+                    new KeyValueTimestamp<>("x", 1L, 4L)
+                )
+            );
+        }
+    }
+
+    private <K, V> void verify(final List<ProducerRecord<K, V>> results, final 
List<KeyValueTimestamp<K, V>> expectedResults) {
+        if (results.size() != expectedResults.size()) {
+            throw new AssertionError(printRecords(results) + " != " + 
expectedResults);
+        }
+        final Iterator<KeyValueTimestamp<K, V>> expectedIterator = 
expectedResults.iterator();
+        for (final ProducerRecord<K, V> result : results) {
+            final KeyValueTimestamp<K, V> expected = expectedIterator.next();
+            try {
+                OutputVerifier.compareKeyValueTimestamp(result, 
expected.key(), expected.value(), expected.timestamp());
+            } catch (final AssertionError e) {
+                throw new AssertionError(printRecords(results) + " != " + 
expectedResults, e);
+            }
+        }
+    }
+
+    private <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final 
TopologyTestDriver driver, final String topic, final Deserializer<K> 
keyDeserializer, final Deserializer<V> valueDeserializer) {
+        final List<ProducerRecord<K, V>> result = new LinkedList<>();
+        for (ProducerRecord<K, V> next = driver.readOutput(topic, 
keyDeserializer, valueDeserializer);
+             next != null;
+             next = driver.readOutput(topic, keyDeserializer, 
valueDeserializer)) {
+            result.add(next);
+        }
+        return new ArrayList<>(result);
+    }
+
+    private <K, V> String printRecords(final List<ProducerRecord<K, V>> 
result) {
+        final StringBuilder resultStr = new StringBuilder();
+        resultStr.append("[\n");
+        for (final ProducerRecord<?, ?> record : result) {
+            resultStr.append("  ").append(record.toString()).append("\n");
+        }
+        resultStr.append("]");
+        return resultStr.toString();
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
new file mode 100644
index 00000000000..2b054230839
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.graph;
+
+import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import 
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
+import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+public class GraphGraceSearchUtilTest {
+    @Test
+    public void shouldThrowOnNull() {
+        try {
+            GraphGraceSearchUtil.findAndVerifyWindowGrace(null);
+            fail("Should have thrown.");
+        } catch (final TopologyException e) {
+            assertThat(e.getMessage(), is("Invalid topology: Window close time 
is only defined for windowed computations. Got []."));
+        }
+    }
+
+    @Test
+    public void shouldFailIfThereIsNoGraceAncestor() {
+        // doesn't matter if this ancestor is stateless or stateful. The 
important thing it that there is
+        // no grace period defined on any ancestor of the node
+        final StatefulProcessorNode<String, Long> gracelessAncestor = new 
StatefulProcessorNode<>(
+            "stateful",
+            new ProcessorParameters<>(
+                () -> new Processor<Object, Object>() {
+                    @Override
+                    public void init(final ProcessorContext context) {}
+
+                    @Override
+                    public void process(final Object key, final Object value) 
{}
+
+                    @Override
+                    public void close() {}
+                },
+                "dummy"
+            ),
+            null,
+            null,
+            false
+        );
+
+        final ProcessorGraphNode<String, Long> node = new 
ProcessorGraphNode<>("stateless", null);
+        gracelessAncestor.addChild(node);
+
+        try {
+            GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
+            fail("should have thrown.");
+        } catch (final TopologyException e) {
+            assertThat(e.getMessage(), is("Invalid topology: Window close time 
is only defined for windowed computations. Got [stateful->stateless]."));
+        }
+    }
+
+    @Test
+    public void shouldExtractGraceFromKStreamWindowAggregateNode() {
+        final TimeWindows windows = TimeWindows.of(10L).grace(1234L);
+        final StatefulProcessorNode<String, Long> node = new 
StatefulProcessorNode<>(
+            "asdf",
+            new ProcessorParameters<>(
+                new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
+                    windows,
+                    "asdf",
+                    null,
+                    null
+                ),
+                "asdf"
+            ),
+            null,
+            null,
+            false
+        );
+
+        final long extracted = 
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
+        assertThat(extracted, is(windows.gracePeriodMs()));
+    }
+
+    @Test
+    public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() {
+        final SessionWindows windows = SessionWindows.with(10L).grace(1234L);
+
+        final StatefulProcessorNode<String, Long> node = new 
StatefulProcessorNode<>(
+            "asdf",
+            new ProcessorParameters<>(
+                new KStreamSessionWindowAggregate<String, Long, Integer>(
+                    windows,
+                    "asdf",
+                    null,
+                    null,
+                    null
+                ),
+                "asdf"
+            ),
+            null,
+            null,
+            false
+        );
+
+        final long extracted = 
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
+        assertThat(extracted, is(windows.gracePeriodMs()));
+    }
+
+    @Test
+    public void shouldExtractGraceFromAncestorThroughStatefulParent() {
+        final SessionWindows windows = SessionWindows.with(10L).grace(1234L);
+        final StatefulProcessorNode<String, Long> graceGrandparent = new 
StatefulProcessorNode<>(
+            "asdf",
+            new ProcessorParameters<>(new 
KStreamSessionWindowAggregate<String, Long, Integer>(
+                windows, "asdf", null, null, null
+            ), "asdf"),
+            null,
+            null,
+            false
+        );
+
+        final StatefulProcessorNode<String, Long> statefulParent = new 
StatefulProcessorNode<>(
+            "stateful",
+            new ProcessorParameters<>(
+                () -> new Processor<Object, Object>() {
+                    @Override
+                    public void init(final ProcessorContext context) {}
+
+                    @Override
+                    public void process(final Object key, final Object value) 
{}
+
+                    @Override
+                    public void close() {}
+                },
+                "dummy"
+            ),
+            null,
+            null,
+            false
+        );
+        graceGrandparent.addChild(statefulParent);
+
+        final ProcessorGraphNode<String, Long> node = new 
ProcessorGraphNode<>("stateless", null);
+        statefulParent.addChild(node);
+
+        final long extracted = 
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
+        assertThat(extracted, is(windows.gracePeriodMs()));
+    }
+
+    @Test
+    public void shouldExtractGraceFromAncestorThroughStatelessParent() {
+        final SessionWindows windows = SessionWindows.with(10L).grace(1234L);
+        final StatefulProcessorNode<String, Long> graceGrandparent = new 
StatefulProcessorNode<>(
+            "asdf",
+            new ProcessorParameters<>(
+                new KStreamSessionWindowAggregate<String, Long, Integer>(
+                    windows,
+                    "asdf",
+                    null,
+                    null,
+                    null
+                ),
+                "asdf"
+            ),
+            null,
+            null,
+            false
+        );
+
+        final ProcessorGraphNode<String, Long> statelessParent = new 
ProcessorGraphNode<>("stateless", null);
+        graceGrandparent.addChild(statelessParent);
+
+        final ProcessorGraphNode<String, Long> node = new 
ProcessorGraphNode<>("stateless", null);
+        statelessParent.addChild(node);
+
+        final long extracted = 
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
+        assertThat(extracted, is(windows.gracePeriodMs()));
+    }
+
+    @Test
+    public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() {
+        final StatefulProcessorNode<String, Long> leftParent = new 
StatefulProcessorNode<>(
+            "asdf",
+            new ProcessorParameters<>(
+                new KStreamSessionWindowAggregate<String, Long, Integer>(
+                    SessionWindows.with(10L).grace(1234L),
+                    "asdf",
+                    null,
+                    null,
+                    null
+                ),
+                "asdf"
+            ),
+            null,
+            null,
+            false
+        );
+
+        final StatefulProcessorNode<String, Long> rightParent = new 
StatefulProcessorNode<>(
+            "asdf",
+            new ProcessorParameters<>(
+                new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
+                    TimeWindows.of(10L).grace(4321L),
+                    "asdf",
+                    null,
+                    null
+                ),
+                "asdf"
+            ),
+            null,
+            null,
+            false
+        );
+
+        final ProcessorGraphNode<String, Long> node = new 
ProcessorGraphNode<>("stateless", null);
+        leftParent.addChild(node);
+        rightParent.addChild(node);
+
+        final long extracted = 
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
+        assertThat(extracted, is(4321L));
+    }
+
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
new file mode 100644
index 00000000000..466033316c7
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Suppressed;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.test.MockInternalProcessorContext;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Collection;
+
+import static java.time.Duration.ZERO;
+import static java.time.Duration.ofMillis;
+import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
+import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("PointlessArithmeticExpression")
+public class KTableSuppressProcessorTest {
+    /**
+     * Use this value to indicate that the test correctness does not depend on 
any particular number
+     */
+    private static final long ARBITRARY_LONG = 5L;
+
+    /**
+     * Use this value to indicate that the test correctness does not depend on 
any particular window
+     */
+    private static final TimeWindow ARBITRARY_WINDOW = new TimeWindow(0L, 
100L);
+
+    @Test
+    public void zeroTimeLimitShouldImmediatelyEmit() {
+        final KTableSuppressProcessor<String, Long> processor =
+            new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, 
unbounded())));
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = ARBITRARY_LONG;
+        context.setTimestamp(timestamp);
+        context.setStreamTime(timestamp);
+        final String key = "hey";
+        final Change<Long> value = new Change<>(ARBITRARY_LONG, 
ARBITRARY_LONG);
+        processor.process(key, value);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
+    }
+
+    @Test
+    public void windowedZeroTimeLimitShouldImmediatelyEmit() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor =
+            new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, 
unbounded())));
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = ARBITRARY_LONG;
+        context.setTimestamp(timestamp);
+        context.setStreamTime(timestamp);
+        final Windowed<String> key = new Windowed<>("hey", ARBITRARY_WINDOW);
+        final Change<Long> value = new Change<>(ARBITRARY_LONG, 
ARBITRARY_LONG);
+        processor.process(key, value);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
+    }
+
+    @Test
+    public void intermediateSuppressionShouldThrow() {
+        final KTableSuppressProcessor<String, Long> processor =
+            new 
KTableSuppressProcessor<>(getImpl(untilTimeLimit(Duration.ofMillis(1), 
unbounded())));
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        try {
+            processor.process("hey", new Change<>(null, 1L));
+            fail("expected an exception for now");
+        } catch (final KTableSuppressProcessor.NotImplementedException e) {
+            // expected
+        }
+        assertThat(context.forwarded(), hasSize(0));
+    }
+
+
+    @SuppressWarnings("unchecked")
+    private <K extends Windowed> SuppressedImpl<K> finalResults(final Duration 
grace) {
+        return ((FinalResultsSuppressionBuilder) 
untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace);
+    }
+
+
+    @Test
+    public void finalResultsSuppressionShouldThrow() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor =
+            new KTableSuppressProcessor<>(finalResults(ofMillis(1)));
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        context.setTimestamp(ARBITRARY_LONG);
+        try {
+            processor.process(new Windowed<>("hey", ARBITRARY_WINDOW), new 
Change<>(ARBITRARY_LONG, ARBITRARY_LONG));
+            fail("expected an exception for now");
+        } catch (final KTableSuppressProcessor.NotImplementedException e) {
+            // expected
+        }
+        assertThat(context.forwarded(), hasSize(0));
+    }
+
+    @Test
+    public void finalResultsWith0GraceBeforeWindowEndShouldThrow() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor =
+            new KTableSuppressProcessor<>(finalResults(ofMillis(0)));
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 5L;
+        context.setTimestamp(timestamp);
+        final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 
100L));
+        final Change<Long> value = new Change<>(ARBITRARY_LONG, 
ARBITRARY_LONG);
+        try {
+            processor.process(key, value);
+            fail("expected an exception");
+        } catch (final KTableSuppressProcessor.NotImplementedException e) {
+            // expected
+        }
+        assertThat(context.forwarded(), hasSize(0));
+    }
+
+    @Test
+    public void finalResultsWith0GraceAtWindowEndShouldImmediatelyEmit() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor =
+            new KTableSuppressProcessor<>(finalResults(ofMillis(0)));
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setTimestamp(timestamp);
+        context.setStreamTime(timestamp);
+        final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 
100L));
+        final Change<Long> value = new Change<>(ARBITRARY_LONG, 
ARBITRARY_LONG);
+        processor.process(key, value);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
+    }
+
+    private static <E> Matcher<Collection<E>> hasSize(final int i) {
+        return new BaseMatcher<Collection<E>>() {
+            @Override
+            public void describeTo(final Description description) {
+                description.appendText("a collection of size " + i);
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public boolean matches(final Object item) {
+                if (item == null) {
+                    return false;
+                } else {
+                    return ((Collection<E>) item).size() == i;
+                }
+            }
+
+        };
+    }
+
+    private static <K> SuppressedImpl<K> getImpl(final Suppressed<K> 
suppressed) {
+        return (SuppressedImpl<K>) suppressed;
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
new file mode 100644
index 00000000000..14f8561030f
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+public class MockInternalProcessorContext extends MockProcessorContext 
implements InternalProcessorContext {
+    private ProcessorNode currentNode;
+    private long streamTime;
+
+    @Override
+    public StreamsMetricsImpl metrics() {
+        return (StreamsMetricsImpl) super.metrics();
+    }
+
+    @Override
+    public ProcessorRecordContext recordContext() {
+        return new ProcessorRecordContext(timestamp(), offset(), partition(), 
topic(), headers());
+    }
+
+    @Override
+    public void setRecordContext(final ProcessorRecordContext recordContext) {
+        setRecordMetadata(
+            recordContext.topic(),
+            recordContext.partition(),
+            recordContext.offset(),
+            recordContext.headers(),
+            recordContext.timestamp()
+        );
+    }
+
+    @Override
+    public void setCurrentNode(final ProcessorNode currentNode) {
+        this.currentNode = currentNode;
+    }
+
+    @Override
+    public ProcessorNode currentNode() {
+        return currentNode;
+    }
+
+    @Override
+    public ThreadCache getCache() {
+        return null;
+    }
+
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public void uninitialize() {
+
+    }
+
+    @Override
+    public long streamTime() {
+        return streamTime;
+    }
+
+    public void setStreamTime(final long streamTime) {
+        this.streamTime = streamTime;
+    }
+}
\ No newline at end of file
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index cba02573b59..dc854b0a5f5 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -405,13 +405,18 @@ public void cancel() {
     @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key, final V value) {
-        capturedForwards.add(new CapturedForward(To.all(), new KeyValue(key, 
value)));
+        forward(key, value, To.all());
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public <K, V> void forward(final K key, final V value, final To to) {
-        capturedForwards.add(new CapturedForward(to, new KeyValue(key, 
value)));
+        capturedForwards.add(
+            new CapturedForward(
+                to.timestamp == -1 ? to.withTimestamp(timestamp == null ? -1 : 
timestamp) : to,
+                new KeyValue(key, value)
+            )
+        );
     }
 
     @SuppressWarnings("deprecation")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-328: Add in-memory Suppression
> ----------------------------------
>
>                 Key: KAFKA-7223
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7223
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]
>  
> This ticket is to implement Suppress, but only for in-memory buffers.
> (depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to