[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16637193#comment-16637193
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---------------------------------------

guozhangwang closed pull request #5724: KAFKA-7223: Make suppression buffer 
durable
URL: https://github.com/apache/kafka/pull/5724
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 3ce962b1926..a12a97a2395 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -36,6 +36,7 @@
 import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
+import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
 import 
org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
@@ -43,6 +44,7 @@
 import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
 
 import java.time.Duration;
 import java.util.Collections;
@@ -356,20 +358,24 @@ public String queryableStoreName() {
     @Override
     public KTable<K, V> suppress(final Suppressed<K> suppressed) {
         final String name = builder.newProcessorName(SUPPRESS_NAME);
+        final String storeName = builder.newStoreName(SUPPRESS_NAME);
 
         final ProcessorSupplier<K, Change<V>> suppressionSupplier =
             () -> new KTableSuppressProcessor<>(
                 buildSuppress(suppressed),
+                storeName,
                 keySerde,
                 valSerde == null ? null : new FullChangeSerde<>(valSerde)
             );
 
-        final ProcessorParameters<K, Change<V>> processorParameters = new 
ProcessorParameters<>(
-            suppressionSupplier,
-            name
-        );
 
-        final ProcessorGraphNode<K, Change<V>> node = new 
ProcessorGraphNode<>(name, processorParameters, false);
+        final ProcessorGraphNode<K, Change<V>> node = new 
StatefulProcessorNode<>(
+            name,
+            new ProcessorParameters<>(suppressionSupplier, name),
+            null,
+            new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName),
+            false
+        );
 
         builder.addGraphNode(streamsGraphNode, node);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java
deleted file mode 100644
index 677a662f79d..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals.suppress;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.internals.ContextualRecord;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer {
-    private final Map<Bytes, TimeKey> index = new HashMap<>();
-    private final TreeMap<TimeKey, ContextualRecord> sortedMap = new 
TreeMap<>();
-    private long memBufferSize = 0L;
-    private long minTimestamp = Long.MAX_VALUE;
-
-    @Override
-    public void evictWhile(final Supplier<Boolean> predicate,
-                           final Consumer<KeyValue<Bytes, ContextualRecord>> 
callback) {
-        final Iterator<Map.Entry<TimeKey, ContextualRecord>> delegate = 
sortedMap.entrySet().iterator();
-
-        if (predicate.get()) {
-            Map.Entry<TimeKey, ContextualRecord> next = null;
-            if (delegate.hasNext()) {
-                next = delegate.next();
-            }
-
-            // predicate being true means we read one record, call the 
callback, and then remove it
-            while (next != null && predicate.get()) {
-                callback.accept(new KeyValue<>(next.getKey().key(), 
next.getValue()));
-
-                delegate.remove();
-                index.remove(next.getKey().key());
-
-                memBufferSize = memBufferSize - 
computeRecordSize(next.getKey().key(), next.getValue());
-
-                // peek at the next record so we can update the minTimestamp
-                if (delegate.hasNext()) {
-                    next = delegate.next();
-                    minTimestamp = next == null ? Long.MAX_VALUE : 
next.getKey().time();
-                } else {
-                    next = null;
-                    minTimestamp = Long.MAX_VALUE;
-                }
-            }
-        }
-    }
-
-    @Override
-    public void put(final long time,
-                    final Bytes key,
-                    final ContextualRecord value) {
-        // non-resetting semantics:
-        // if there was a previous version of the same record,
-        // then insert the new record in the same place in the priority queue
-
-        final TimeKey previousKey = index.get(key);
-        if (previousKey == null) {
-            final TimeKey nextKey = new TimeKey(time, key);
-            index.put(key, nextKey);
-            sortedMap.put(nextKey, value);
-            minTimestamp = Math.min(minTimestamp, time);
-            memBufferSize = memBufferSize + computeRecordSize(key, value);
-        } else {
-            final ContextualRecord removedValue = sortedMap.put(previousKey, 
value);
-            memBufferSize =
-                memBufferSize
-                    + computeRecordSize(key, value)
-                    - (removedValue == null ? 0 : computeRecordSize(key, 
removedValue));
-        }
-    }
-
-    @Override
-    public int numRecords() {
-        return index.size();
-    }
-
-    @Override
-    public long bufferSize() {
-        return memBufferSize;
-    }
-
-    @Override
-    public long minTimestamp() {
-        return minTimestamp;
-    }
-
-    private long computeRecordSize(final Bytes key, final ContextualRecord 
value) {
-        long size = 0L;
-        size += 8; // buffer time
-        size += key.get().length;
-        if (value != null) {
-            size += value.sizeBytes();
-        }
-        return size;
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 57e5066d09e..62c034d4893 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -28,6 +28,9 @@
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.state.internals.ContextualRecord;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
+
+import java.util.Objects;
 
 import static java.util.Objects.requireNonNull;
 
@@ -35,25 +38,27 @@
     private final long maxRecords;
     private final long maxBytes;
     private final long suppressDurationMillis;
-    private final TimeOrderedKeyValueBuffer buffer;
     private final TimeDefinition<K> bufferTimeDefinition;
     private final BufferFullStrategy bufferFullStrategy;
     private final boolean shouldSuppressTombstones;
+    private final String storeName;
+    private TimeOrderedKeyValueBuffer buffer;
     private InternalProcessorContext internalProcessorContext;
 
     private Serde<K> keySerde;
-    private Serde<Change<V>> valueSerde;
+    private FullChangeSerde<V> valueSerde;
 
     public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
+                                   final String storeName,
                                    final Serde<K> keySerde,
                                    final FullChangeSerde<V> valueSerde) {
+        this.storeName = storeName;
         requireNonNull(suppress);
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         maxRecords = suppress.getBufferConfig().maxRecords();
         maxBytes = suppress.getBufferConfig().maxBytes();
         suppressDurationMillis = 
suppress.getTimeToWaitForMoreEvents().toMillis();
-        buffer = new InMemoryTimeOrderedKeyValueBuffer();
         bufferTimeDefinition = suppress.getTimeDefinition();
         bufferFullStrategy = suppress.getBufferConfig().bufferFullStrategy();
         shouldSuppressTombstones = suppress.shouldSuppressTombstones();
@@ -63,8 +68,9 @@ public KTableSuppressProcessor(final SuppressedInternal<K> 
suppress,
     @Override
     public void init(final ProcessorContext context) {
         internalProcessorContext = (InternalProcessorContext) context;
-        this.keySerde = keySerde == null ? (Serde<K>) context.keySerde() : 
keySerde;
-        this.valueSerde = valueSerde == null ? 
FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
+        keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde;
+        valueSerde = valueSerde == null ? 
FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
+        buffer = Objects.requireNonNull((TimeOrderedKeyValueBuffer) 
context.getStateStore(storeName));
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java
deleted file mode 100644
index d3ad350686a..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals.suppress;
-
-import org.apache.kafka.common.utils.Bytes;
-
-import java.util.Objects;
-
-class TimeKey implements Comparable<TimeKey> {
-    private final long time;
-    private final Bytes key;
-
-    TimeKey(final long time, final Bytes key) {
-        this.time = time;
-        this.key = key;
-    }
-
-    Bytes key() {
-        return key;
-    }
-
-    long time() {
-        return time;
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        final TimeKey timeKey = (TimeKey) o;
-        return time == timeKey.time &&
-            Objects.equals(key, timeKey.key);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(time, key);
-    }
-
-    @Override
-    public int compareTo(final TimeKey o) {
-        // ordering of keys within a time uses hashCode.
-        final int timeComparison = Long.compare(time, o.time);
-        return timeComparison == 0 ? key.compareTo(o.key) : timeComparison;
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
new file mode 100644
index 00000000000..3ac6fc87073
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class InMemoryTimeOrderedKeyValueBuffer implements 
TimeOrderedKeyValueBuffer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryTimeOrderedKeyValueBuffer.class);
+    private static final BytesSerializer KEY_SERIALIZER = new 
BytesSerializer();
+    private static final ByteArraySerializer VALUE_SERIALIZER = new 
ByteArraySerializer();
+
+    private final Map<Bytes, BufferKey> index = new HashMap<>();
+    private final TreeMap<BufferKey, ContextualRecord> sortedMap = new 
TreeMap<>();
+
+    private final Set<Bytes> dirtyKeys = new HashSet<>();
+    private final String storeName;
+    private final boolean loggingEnabled;
+
+    private long memBufferSize = 0L;
+    private long minTimestamp = Long.MAX_VALUE;
+    private RecordCollector collector;
+    private String changelogTopic;
+
+    private volatile boolean open;
+
+    public static class Builder implements StoreBuilder<StateStore> {
+
+        private final String storeName;
+        private boolean loggingEnabled = true;
+
+        public Builder(final String storeName) {
+            this.storeName = storeName;
+        }
+
+        @Override
+        public StoreBuilder<StateStore> withCachingEnabled() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public StoreBuilder<StateStore> withCachingDisabled() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public StoreBuilder<StateStore> withLoggingEnabled(final Map<String, 
String> config) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public StoreBuilder<StateStore> withLoggingDisabled() {
+            loggingEnabled = false;
+            return this;
+        }
+
+        @Override
+        public StateStore build() {
+            return new InMemoryTimeOrderedKeyValueBuffer(storeName, 
loggingEnabled);
+        }
+
+        @Override
+        public Map<String, String> logConfig() {
+            return Collections.emptyMap();
+        }
+
+        @Override
+        public boolean loggingEnabled() {
+            return loggingEnabled;
+        }
+
+        @Override
+        public String name() {
+            return storeName;
+        }
+    }
+
+    private static class BufferKey implements Comparable<BufferKey> {
+        private final long time;
+        private final Bytes key;
+
+        private BufferKey(final long time, final Bytes key) {
+            this.time = time;
+            this.key = key;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            final BufferKey bufferKey = (BufferKey) o;
+            return time == bufferKey.time &&
+                Objects.equals(key, bufferKey.key);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(time, key);
+        }
+
+        @Override
+        public int compareTo(final BufferKey o) {
+            // ordering of keys within a time uses hashCode.
+            final int timeComparison = Long.compare(time, o.time);
+            return timeComparison == 0 ? key.compareTo(o.key) : timeComparison;
+        }
+    }
+
+    private InMemoryTimeOrderedKeyValueBuffer(final String storeName, final 
boolean loggingEnabled) {
+        this.storeName = storeName;
+        this.loggingEnabled = loggingEnabled;
+    }
+
+    @Override
+    public String name() {
+        return storeName;
+    }
+
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        context.register(root, (RecordBatchingStateRestoreCallback) 
this::restoreBatch);
+        if (loggingEnabled) {
+            collector = ((RecordCollector.Supplier) context).recordCollector();
+            changelogTopic = 
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
+        }
+        open = true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public void close() {
+        index.clear();
+        sortedMap.clear();
+        dirtyKeys.clear();
+        memBufferSize = 0;
+        minTimestamp = Long.MAX_VALUE;
+        open = false;
+    }
+
+    @Override
+    public void flush() {
+        if (loggingEnabled) {
+            // counting on this getting called before the record collector's 
flush
+            for (final Bytes key : dirtyKeys) {
+
+                final BufferKey bufferKey = index.get(key);
+
+                if (bufferKey == null) {
+                    // The record was evicted from the buffer. Send a 
tombstone.
+                    collector.send(changelogTopic, key, null, null, null, 
null, KEY_SERIALIZER, VALUE_SERIALIZER);
+                } else {
+                    final ContextualRecord value = sortedMap.get(bufferKey);
+
+                    final byte[] innerValue = value.value();
+                    final byte[] timeAndValue = ByteBuffer.wrap(new byte[8 + 
innerValue.length])
+                                                          
.putLong(bufferKey.time)
+                                                          .put(innerValue)
+                                                          .array();
+
+                    final ProcessorRecordContext recordContext = 
value.recordContext();
+                    collector.send(
+                        changelogTopic,
+                        key,
+                        timeAndValue,
+                        recordContext.headers(),
+                        recordContext.partition(),
+                        recordContext.timestamp(),
+                        KEY_SERIALIZER,
+                        VALUE_SERIALIZER
+                    );
+                }
+            }
+            dirtyKeys.clear();
+        }
+    }
+
+    private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> 
batch) {
+        for (final ConsumerRecord<byte[], byte[]> record : batch) {
+            final Bytes key = Bytes.wrap(record.key());
+            if (record.value() == null) {
+                // This was a tombstone. Delete the record.
+                final BufferKey bufferKey = index.remove(key);
+                if (bufferKey != null) {
+                    sortedMap.remove(bufferKey);
+                }
+            } else {
+                final ByteBuffer timeAndValue = 
ByteBuffer.wrap(record.value());
+                final long time = timeAndValue.getLong();
+                final byte[] value = new byte[record.value().length - 8];
+                timeAndValue.get(value);
+
+                cleanPut(
+                    time,
+                    key,
+                    new ContextualRecord(
+                        value,
+                        new ProcessorRecordContext(
+                            record.timestamp(),
+                            record.offset(),
+                            record.partition(),
+                            record.topic(),
+                            record.headers()
+                        )
+                    )
+                );
+            }
+        }
+    }
+
+
+    @Override
+    public void evictWhile(final Supplier<Boolean> predicate,
+                           final Consumer<KeyValue<Bytes, ContextualRecord>> 
callback) {
+        final Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = 
sortedMap.entrySet().iterator();
+
+        if (predicate.get()) {
+            Map.Entry<BufferKey, ContextualRecord> next = null;
+            if (delegate.hasNext()) {
+                next = delegate.next();
+            }
+
+            // predicate being true means we read one record, call the 
callback, and then remove it
+            while (next != null && predicate.get()) {
+                callback.accept(new KeyValue<>(next.getKey().key, 
next.getValue()));
+
+                delegate.remove();
+                index.remove(next.getKey().key);
+
+                dirtyKeys.add(next.getKey().key);
+
+                memBufferSize = memBufferSize - 
computeRecordSize(next.getKey().key, next.getValue());
+
+                // peek at the next record so we can update the minTimestamp
+                if (delegate.hasNext()) {
+                    next = delegate.next();
+                    minTimestamp = next == null ? Long.MAX_VALUE : 
next.getKey().time;
+                } else {
+                    next = null;
+                    minTimestamp = Long.MAX_VALUE;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void put(final long time,
+                    final Bytes key,
+                    final ContextualRecord value) {
+        cleanPut(time, key, value);
+        dirtyKeys.add(key);
+    }
+
+    private void cleanPut(final long time, final Bytes key, final 
ContextualRecord value) {
+        // non-resetting semantics:
+        // if there was a previous version of the same record,
+        // then insert the new record in the same place in the priority queue
+
+        final BufferKey previousKey = index.get(key);
+        if (previousKey == null) {
+            final BufferKey nextKey = new BufferKey(time, key);
+            index.put(key, nextKey);
+            sortedMap.put(nextKey, value);
+            minTimestamp = Math.min(minTimestamp, time);
+            memBufferSize = memBufferSize + computeRecordSize(key, value);
+        } else {
+            final ContextualRecord removedValue = sortedMap.put(previousKey, 
value);
+            memBufferSize =
+                memBufferSize
+                    + computeRecordSize(key, value)
+                    - (removedValue == null ? 0 : computeRecordSize(key, 
removedValue));
+        }
+    }
+
+    @Override
+    public int numRecords() {
+        return index.size();
+    }
+
+    @Override
+    public long bufferSize() {
+        return memBufferSize;
+    }
+
+    @Override
+    public long minTimestamp() {
+        return minTimestamp;
+    }
+
+    private long computeRecordSize(final Bytes key, final ContextualRecord 
value) {
+        long size = 0L;
+        size += 8; // buffer time
+        size += key.get().length;
+        if (value != null) {
+            size += value.sizeBytes();
+        }
+        return size;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
similarity index 87%
rename from 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java
rename to 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
index 98a4f63c83f..86a8c1e651d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
@@ -14,16 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.kstream.internals.suppress;
+package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.internals.ContextualRecord;
+import org.apache.kafka.streams.processor.StateStore;
 
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-interface TimeOrderedKeyValueBuffer {
+public interface TimeOrderedKeyValueBuffer extends StateStore {
     void evictWhile(final Supplier<Boolean> predicate, final 
Consumer<KeyValue<Bytes, ContextualRecord>> callback);
 
     void put(final long time, final Bytes key, final ContextualRecord value);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
new file mode 100644
index 00000000000..93ecc53a7fc
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.Long.MAX_VALUE;
+import static java.time.Duration.ofMillis;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
+import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
+import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
+import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@RunWith(Parameterized.class)
+@Category({IntegrationTest.class})
+public class SuppressionDurabilityIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3);
+    private static final StringDeserializer STRING_DESERIALIZER = new 
StringDeserializer();
+    private static final StringSerializer STRING_SERIALIZER = new 
StringSerializer();
+    private static final Serde<String> STRING_SERDE = Serdes.String();
+    private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+    private static final int COMMIT_INTERVAL = 100;
+    private final boolean eosEnabled;
+
+    public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) {
+        this.eosEnabled = eosEnabled;
+    }
+
+    @Parameters(name = "{index}: eosEnabled={0}")
+    public static Collection<Object[]> parameters() {
+        return Arrays.asList(new Object[] {false}, new Object[] {true});
+    }
+
+    private KTable<String, Long> buildCountsTable(final String input, final 
StreamsBuilder builder) {
+        return builder
+            .table(
+                input,
+                Consumed.with(STRING_SERDE, STRING_SERDE),
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>with(STRING_SERDE, STRING_SERDE)
+                    .withCachingDisabled()
+                    .withLoggingDisabled()
+            )
+            .groupBy((k, v) -> new KeyValue<>(v, k), 
Grouped.with(STRING_SERDE, STRING_SERDE))
+            .count(Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("counts").withCachingDisabled());
+    }
+
+    @Test
+    public void shouldRecoverBufferAfterShutdown() {
+        final String testId = "-shouldRecoverBufferAfterShutdown";
+        final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = buildCountsTable(input, 
builder);
+
+        final KStream<String, Long> suppressedCounts = valueCounts
+            .suppress(untilTimeLimit(ofMillis(MAX_VALUE), 
maxRecords(3L).emitEarlyWhenFull()))
+            .toStream();
+
+        final AtomicInteger eventCount = new AtomicInteger(0);
+        suppressedCounts.foreach((key, value) -> eventCount.incrementAndGet());
+
+        suppressedCounts
+            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final Properties streamsConfig = mkProperties(mkMap(
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+            mkEntry(StreamsConfig.POLL_MS_CONFIG, 
Integer.toString(COMMIT_INTERVAL)),
+            mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
Integer.toString(COMMIT_INTERVAL)),
+            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosEnabled ? 
EXACTLY_ONCE : AT_LEAST_ONCE)
+        ));
+
+        KafkaStreams driver = getStartedStreams(streamsConfig, builder, true);
+        try {
+            // start by putting some stuff in the buffer
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v2", scaledTime(2L)),
+                    new KeyValueTimestamp<>("k3", "v3", scaledTime(3L))
+                )
+            );
+            verifyOutput(
+                outputRaw,
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(2L)),
+                    new KeyValueTimestamp<>("v3", 1L, scaledTime(3L))
+                )
+            );
+            assertThat(eventCount.get(), is(0));
+
+            // flush two of the first three events out.
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k4", "v4", scaledTime(4L)),
+                    new KeyValueTimestamp<>("k5", "v5", scaledTime(5L))
+                )
+            );
+            verifyOutput(
+                outputRaw,
+                asList(
+                    new KeyValueTimestamp<>("v4", 1L, scaledTime(4L)),
+                    new KeyValueTimestamp<>("v5", 1L, scaledTime(5L))
+                )
+            );
+            assertThat(eventCount.get(), is(2));
+            verifyOutput(
+                outputSuppressed,
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(2L))
+                )
+            );
+
+            // bounce to ensure that the history, including retractions,
+            // get restored properly. (i.e., we shouldn't see those first 
events again)
+
+            // restart the driver
+            driver.close();
+            assertThat(driver.state(), is(KafkaStreams.State.NOT_RUNNING));
+            driver = getStartedStreams(streamsConfig, builder, false);
+
+
+            // flush those recovered buffered events out.
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k6", "v6", scaledTime(6L)),
+                    new KeyValueTimestamp<>("k7", "v7", scaledTime(7L)),
+                    new KeyValueTimestamp<>("k8", "v8", scaledTime(8L))
+                )
+            );
+            verifyOutput(
+                outputRaw,
+                asList(
+                    new KeyValueTimestamp<>("v6", 1L, scaledTime(6L)),
+                    new KeyValueTimestamp<>("v7", 1L, scaledTime(7L)),
+                    new KeyValueTimestamp<>("v8", 1L, scaledTime(8L))
+                )
+            );
+            assertThat(eventCount.get(), is(5));
+            verifyOutput(
+                outputSuppressed,
+                asList(
+                    new KeyValueTimestamp<>("v3", 1L, scaledTime(3L)),
+                    new KeyValueTimestamp<>("v4", 1L, scaledTime(4L)),
+                    new KeyValueTimestamp<>("v5", 1L, scaledTime(5L))
+                )
+            );
+
+        } finally {
+            driver.close();
+            cleanStateAfterTest(CLUSTER, driver);
+        }
+    }
+
+    private void verifyOutput(final String topic, final 
List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
+        final Properties properties = mkProperties(
+            mkMap(
+                mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
+                mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+                mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
+                mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
+            )
+        );
+        IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, 
keyValueTimestamps);
+
+    }
+
+    /**
+     * scaling to ensure that there are commits in between the various test 
events,
+     * just to exercise that everything works properly in the presence of 
commits.
+     */
+    private long scaledTime(final long unscaledTime) {
+        return COMMIT_INTERVAL * 2 * unscaledTime;
+    }
+
+    private void produceSynchronously(final String topic, final 
List<KeyValueTimestamp<String, String>> toProduce) {
+        final Properties producerConfig = mkProperties(mkMap(
+            mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"),
+            mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
+            mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
+            mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers())
+        ));
+        IntegrationTestUtils.produceSynchronously(producerConfig, false, 
topic, toProduce);
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index a9920e3a6f1..208f1eb3c50 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -17,16 +17,12 @@
 package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -38,10 +34,10 @@
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
@@ -53,14 +49,9 @@
 import org.junit.experimental.categories.Category;
 
 import java.time.Duration;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
-import java.util.Objects;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 import static java.lang.Long.MAX_VALUE;
 import static java.time.Duration.ofMillis;
@@ -69,6 +60,9 @@
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
 import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
 import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
 import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
@@ -87,18 +81,17 @@
     private static final Serde<String> STRING_SERDE = Serdes.String();
     private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
     private static final int COMMIT_INTERVAL = 100;
-    private static final int SCALE_FACTOR = COMMIT_INTERVAL * 2;
     private static final long TIMEOUT_MS = 30_000L;
 
     @Test
-    public void shouldSuppressIntermediateEventsWithEmitAfter() throws 
InterruptedException {
+    public void shouldSuppressIntermediateEventsWithEmitAfter() {
         final String testId = "-shouldSuppressIntermediateEventsWithEmitAfter";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputSuppressed, outputRaw);
+        cleanStateBeforeTest(CLUSTER, input, outputSuppressed, outputRaw);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = buildCountsTable(input, 
builder);
@@ -112,7 +105,8 @@ public void shouldSuppressIntermediateEventsWithEmitAfter() 
throws InterruptedEx
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = 
IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
 
         try {
             produceSynchronously(
@@ -144,7 +138,7 @@ public void shouldSuppressIntermediateEventsWithEmitAfter() 
throws InterruptedEx
             );
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
@@ -157,19 +151,19 @@ public void 
shouldSuppressIntermediateEventsWithEmitAfter() throws InterruptedEx
                     .withCachingDisabled()
                     .withLoggingDisabled()
             )
-            .groupBy((k, v) -> new KeyValue<>(v, k), 
Serialized.with(STRING_SERDE, STRING_SERDE))
+            .groupBy((k, v) -> new KeyValue<>(v, k), 
Grouped.with(STRING_SERDE, STRING_SERDE))
             .count(Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("counts").withCachingDisabled());
     }
 
     @Test
-    public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws 
InterruptedException {
+    public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() {
         final String testId = 
"-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputSuppressed, outputRaw);
+        cleanStateBeforeTest(CLUSTER, input, outputSuppressed, outputRaw);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = buildCountsTable(input, 
builder);
@@ -183,7 +177,8 @@ public void 
shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws Interr
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = 
IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
 
         try {
             produceSynchronously(
@@ -217,19 +212,19 @@ public void 
shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws Interr
             );
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
     @Test
-    public void shouldSuppressIntermediateEventsWithRecordLimit() throws 
InterruptedException {
+    public void shouldSuppressIntermediateEventsWithRecordLimit() {
         final String testId = 
"-shouldSuppressIntermediateEventsWithRecordLimit";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = buildCountsTable(input, 
builder);
@@ -243,7 +238,8 @@ public void 
shouldSuppressIntermediateEventsWithRecordLimit() throws Interrupted
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = 
IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
         try {
             produceSynchronously(
                 input,
@@ -275,7 +271,7 @@ public void 
shouldSuppressIntermediateEventsWithRecordLimit() throws Interrupted
             );
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
@@ -287,7 +283,7 @@ public void shouldShutdownWhenRecordConstraintIsViolated() 
throws InterruptedExc
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = buildCountsTable(input, 
builder);
@@ -301,7 +297,8 @@ public void shouldShutdownWhenRecordConstraintIsViolated() 
throws InterruptedExc
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = 
IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
         try {
             produceSynchronously(
                 input,
@@ -315,19 +312,19 @@ public void 
shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedExc
             verifyErrorShutdown(driver);
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
     @Test
-    public void shouldSuppressIntermediateEventsWithBytesLimit() throws 
InterruptedException {
+    public void shouldSuppressIntermediateEventsWithBytesLimit() {
         final String testId = 
"-shouldSuppressIntermediateEventsWithBytesLimit";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = buildCountsTable(input, 
builder);
@@ -342,7 +339,8 @@ public void 
shouldSuppressIntermediateEventsWithBytesLimit() throws InterruptedE
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = 
IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
         try {
             produceSynchronously(
                 input,
@@ -374,7 +372,7 @@ public void 
shouldSuppressIntermediateEventsWithBytesLimit() throws InterruptedE
             );
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
@@ -386,7 +384,7 @@ public void shouldShutdownWhenBytesConstraintIsViolated() 
throws InterruptedExce
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = buildCountsTable(input, 
builder);
@@ -401,7 +399,8 @@ public void shouldShutdownWhenBytesConstraintIsViolated() 
throws InterruptedExce
             .toStream()
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = 
IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
         try {
             produceSynchronously(
                 input,
@@ -415,26 +414,26 @@ public void shouldShutdownWhenBytesConstraintIsViolated() 
throws InterruptedExce
             verifyErrorShutdown(driver);
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
     @Test
-    public void shouldSupportFinalResultsForTimeWindows() throws 
InterruptedException {
+    public void shouldSupportFinalResultsForTimeWindows() {
         final String testId = "-shouldSupportFinalResultsForTimeWindows";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
         final String outputSuppressed = "output-suppressed" + testId;
         final String outputRaw = "output-raw" + testId;
 
-        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
 
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Windowed<String>, Long> valueCounts = builder
             .stream(input,
                     Consumed.with(STRING_SERDE, STRING_SERDE)
             )
-            .groupBy((String k1, String v1) -> k1, 
Serialized.with(STRING_SERDE, STRING_SERDE))
+            .groupBy((String k1, String v1) -> k1, Grouped.with(STRING_SERDE, 
STRING_SERDE))
             .windowedBy(TimeWindows.of(scaledTime(2L)).grace(scaledTime(1L)))
             .count(Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("counts").withCachingDisabled().withLoggingDisabled());
 
@@ -449,7 +448,8 @@ public void shouldSupportFinalResultsForTimeWindows() 
throws InterruptedExceptio
             .map((final Windowed<String> k, final Long v) -> new 
KeyValue<>(k.toString(), v))
             .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
 
-        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        final Properties streamsConfig = getStreamsConfig(appId);
+        final KafkaStreams driver = 
IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
         try {
             produceSynchronously(input, asList(
                 new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
@@ -481,81 +481,40 @@ public void shouldSupportFinalResultsForTimeWindows() 
throws InterruptedExceptio
             );
         } finally {
             driver.close();
-            cleanStateAfterTest(driver);
+            cleanStateAfterTest(CLUSTER, driver);
         }
     }
 
-    private String scaledWindowKey(final String key, final long unscaledStart, 
final long unscaledEnd) {
-        return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), 
scaledTime(unscaledEnd))).toString();
-    }
-
-    private void cleanStateBeforeTest(final String... topics) throws 
InterruptedException {
-        CLUSTER.deleteAllTopicsAndWait(TIMEOUT_MS);
-        for (final String topic : topics) {
-            CLUSTER.createTopic(topic, 1, 1);
-        }
-    }
-
-    private KafkaStreams getCleanStartedStreams(final String appId, final 
StreamsBuilder builder) {
-        final Properties streamsConfig = mkProperties(mkMap(
+    private Properties getStreamsConfig(final String appId) {
+        return mkProperties(mkMap(
             mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
             mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
-            mkEntry(StreamsConfig.POLL_MS_CONFIG, 
Objects.toString(COMMIT_INTERVAL)),
-            mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
Objects.toString(COMMIT_INTERVAL))
+            mkEntry(StreamsConfig.POLL_MS_CONFIG, 
Integer.toString(COMMIT_INTERVAL)),
+            mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
Integer.toString(COMMIT_INTERVAL)),
+            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE)
         ));
-        final KafkaStreams driver = new KafkaStreams(builder.build(), 
streamsConfig);
-        driver.cleanUp();
-        driver.start();
-        return driver;
     }
 
-    private void cleanStateAfterTest(final KafkaStreams driver) throws 
InterruptedException {
-        driver.cleanUp();
-        CLUSTER.deleteAllTopicsAndWait(TIMEOUT_MS);
+    private String scaledWindowKey(final String key, final long unscaledStart, 
final long unscaledEnd) {
+        return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), 
scaledTime(unscaledEnd))).toString();
     }
 
+    /**
+     * scaling to ensure that there are commits in between the various test 
events,
+     * just to exercise that everything works properly in the presence of 
commits.
+     */
     private long scaledTime(final long unscaledTime) {
-        return SCALE_FACTOR * unscaledTime;
+        return COMMIT_INTERVAL * 2 * unscaledTime;
     }
 
     private void produceSynchronously(final String topic, final 
List<KeyValueTimestamp<String, String>> toProduce) {
         final Properties producerConfig = mkProperties(mkMap(
             mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"),
-            mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
STRING_SERIALIZER.getClass().getName()),
-            mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
STRING_SERIALIZER.getClass().getName()),
+            mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
+            mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
((Serializer<String>) STRING_SERIALIZER).getClass().getName()),
             mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers())
         ));
-        try (final Producer<String, String> producer = new 
KafkaProducer<>(producerConfig)) {
-            // TODO: test EOS
-            //noinspection ConstantConditions
-            if (false) {
-                producer.initTransactions();
-                producer.beginTransaction();
-            }
-            final LinkedList<Future<RecordMetadata>> futures = new 
LinkedList<>();
-            for (final KeyValueTimestamp<String, String> record : toProduce) {
-                final Future<RecordMetadata> f = producer.send(
-                    new ProducerRecord<>(topic, null, record.timestamp(), 
record.key(), record.value(), null)
-                );
-                futures.add(f);
-            }
-
-            // TODO: test EOS
-            //noinspection ConstantConditions
-            if (false) {
-                producer.commitTransaction();
-            } else {
-                producer.flush();
-            }
-
-            for (final Future<RecordMetadata> future : futures) {
-                try {
-                    future.get();
-                } catch (final InterruptedException | ExecutionException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }
+        IntegrationTestUtils.produceSynchronously(producerConfig, false, 
topic, toProduce);
     }
 
     private void verifyErrorShutdown(final KafkaStreams driver) throws 
InterruptedException {
@@ -563,69 +522,16 @@ private void verifyErrorShutdown(final KafkaStreams 
driver) throws InterruptedEx
         assertThat(driver.state(), is(KafkaStreams.State.ERROR));
     }
 
-    private void verifyOutput(final String topic, final 
List<KeyValueTimestamp<String, Long>> expected) {
-        final List<ConsumerRecord<String, Long>> results;
-        try {
-            final Properties properties = mkProperties(
-                mkMap(
-                    mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
-                    mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
-                    mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
-                    mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
-                )
-            );
-            results = 
IntegrationTestUtils.waitUntilMinRecordsReceived(properties, topic, 
expected.size());
-        } catch (final InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-
-        if (results.size() != expected.size()) {
-            throw new AssertionError(printRecords(results) + " != " + 
expected);
-        }
-        final Iterator<KeyValueTimestamp<String, Long>> expectedIterator = 
expected.iterator();
-        for (final ConsumerRecord<String, Long> result : results) {
-            final KeyValueTimestamp<String, Long> expected1 = 
expectedIterator.next();
-            try {
-                compareKeyValueTimestamp(result, expected1.key(), 
expected1.value(), expected1.timestamp());
-            } catch (final AssertionError e) {
-                throw new AssertionError(printRecords(results) + " != " + 
expected, e);
-            }
-        }
-    }
-
-    private <K, V> void compareKeyValueTimestamp(final ConsumerRecord<K, V> 
record, final K expectedKey, final V expectedValue, final long 
expectedTimestamp) {
-        Objects.requireNonNull(record);
-        final K recordKey = record.key();
-        final V recordValue = record.value();
-        final long recordTimestamp = record.timestamp();
-        final AssertionError error = new AssertionError("Expected <" + 
expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp +
-                                                            " but was <" + 
recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp);
-        if (recordKey != null) {
-            if (!recordKey.equals(expectedKey)) {
-                throw error;
-            }
-        } else if (expectedKey != null) {
-            throw error;
-        }
-        if (recordValue != null) {
-            if (!recordValue.equals(expectedValue)) {
-                throw error;
-            }
-        } else if (expectedValue != null) {
-            throw error;
-        }
-        if (recordTimestamp != expectedTimestamp) {
-            throw error;
-        }
-    }
+    private void verifyOutput(final String topic, final 
List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
+        final Properties properties = mkProperties(
+            mkMap(
+                mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
+                mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+                mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
+                mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
+            )
+        );
+        IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, 
keyValueTimestamps);
 
-    private <K, V> String printRecords(final List<ConsumerRecord<K, V>> 
result) {
-        final StringBuilder resultStr = new StringBuilder();
-        resultStr.append("[\n");
-        for (final ConsumerRecord<?, ?> record : result) {
-            resultStr.append("  ").append(record.toString()).append("\n");
-        }
-        resultStr.append("]");
-        return resultStr.toString();
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 985b57f4cd5..8bca79f8d83 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.streams.integration.utils;
 
+import kafka.api.Request;
+import kafka.server.KafkaServer;
+import kafka.server.MetadataCache;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -33,11 +36,14 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
+import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import 
org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
+import scala.Option;
 
 import java.io.File;
 import java.io.IOException;
@@ -47,24 +53,23 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
-import kafka.api.Request;
-import kafka.server.KafkaServer;
-import kafka.server.MetadataCache;
-import scala.Option;
-
 /**
  * Utility functions to make integration testing more convenient.
  */
 public class IntegrationTestUtils {
 
     public static final long DEFAULT_TIMEOUT = 30 * 1000L;
+    private static final long DEFAULT_COMMIT_INTERVAL = 100L;
     public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = 
"internal.leave.group.on.close";
 
     /*
@@ -112,6 +117,26 @@ public static void purgeLocalStreamsState(final Properties 
streamsConfiguration)
         }
     }
 
+    public static void cleanStateBeforeTest(final EmbeddedKafkaCluster 
cluster, final String... topics) {
+        try {
+            cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
+            for (final String topic : topics) {
+                cluster.createTopic(topic, 1, 1);
+            }
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, 
final KafkaStreams driver) {
+        driver.cleanUp();
+        try {
+            cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * @param topic          Kafka topic to write the data records to
      * @param records        Data records to write to Kafka
@@ -171,15 +196,6 @@ public static void purgeLocalStreamsState(final Properties 
streamsConfiguration)
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, 
records, producerConfig, timestamp, false);
     }
 
-    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final 
String topic,
-                                                                         final 
Collection<KeyValue<K, V>> records,
-                                                                         final 
Properties producerConfig,
-                                                                         final 
Headers headers,
-                                                                         final 
Long timestamp)
-        throws ExecutionException, InterruptedException {
-        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, 
records, producerConfig, headers, timestamp, false);
-    }
-
     public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final 
String topic,
                                                                          final 
Collection<KeyValue<K, V>> records,
                                                                          final 
Properties producerConfig,
@@ -212,7 +228,42 @@ public static void purgeLocalStreamsState(final Properties 
streamsConfiguration)
             producer.flush();
         }
     }
-    
+
+    public static <V, K> void produceSynchronously(final Properties 
producerConfig,
+                                                    final boolean eos,
+                                                    final String topic,
+                                                    final 
List<KeyValueTimestamp<K, V>> toProduce) {
+        try (final Producer<K, V> producer = new 
KafkaProducer<>(producerConfig)) {
+            // TODO: test EOS
+            //noinspection ConstantConditions
+            if (false) {
+                producer.initTransactions();
+                producer.beginTransaction();
+            }
+            final LinkedList<Future<RecordMetadata>> futures = new 
LinkedList<>();
+            for (final KeyValueTimestamp<K, V> record : toProduce) {
+                final Future<RecordMetadata> f = producer.send(
+                    new ProducerRecord<>(topic, null, record.timestamp(), 
record.key(), record.value(), null)
+                );
+                futures.add(f);
+            }
+
+            if (eos) {
+                producer.commitTransaction();
+            } else {
+                producer.flush();
+            }
+
+            for (final Future<RecordMetadata> future : futures) {
+                try {
+                    future.get();
+                } catch (final InterruptedException | ExecutionException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
     public static <K, V> void 
produceAbortedKeyValuesSynchronouslyWithTimestamp(final String topic,
                                                                                
 final Collection<KeyValue<K, V>> records,
                                                                                
 final Properties producerConfig,
@@ -227,7 +278,7 @@ public static void purgeLocalStreamsState(final Properties 
streamsConfiguration)
                 f.get();
                 producer.abortTransaction();
             }
-        }    
+        }
     }
 
     public static <V> void produceValuesSynchronously(final String topic,
@@ -297,7 +348,7 @@ public static void waitForCompletion(final KafkaStreams 
streams,
                                                                                
   final int expectedNumRecords) throws InterruptedException {
         return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, 
expectedNumRecords, DEFAULT_TIMEOUT);
     }
-    
+
     /**
      * Wait until enough data (key-value records) has been consumed.
      *
@@ -483,6 +534,70 @@ public static void waitUntilMetadataIsPropagated(final 
List<KafkaServer> servers
 
     }
 
+    public static void verifyKeyValueTimestamps(final Properties 
consumerConfig,
+                                                final String topic,
+                                                final 
List<KeyValueTimestamp<String, Long>> expected) {
+
+        final List<ConsumerRecord<String, Long>> results;
+        try {
+            results = 
IntegrationTestUtils.waitUntilMinRecordsReceived(consumerConfig, topic, 
expected.size());
+        } catch (final InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+
+        if (results.size() != expected.size()) {
+            throw new AssertionError(printRecords(results) + " != " + 
expected);
+        }
+        final Iterator<KeyValueTimestamp<String, Long>> expectedIterator = 
expected.iterator();
+        for (final ConsumerRecord<String, Long> result : results) {
+            final KeyValueTimestamp<String, Long> expected1 = 
expectedIterator.next();
+            try {
+                compareKeyValueTimestamp(result, expected1.key(), 
expected1.value(), expected1.timestamp());
+            } catch (final AssertionError e) {
+                throw new AssertionError(printRecords(results) + " != " + 
expected, e);
+            }
+        }
+    }
+
+    private static <K, V> void compareKeyValueTimestamp(final 
ConsumerRecord<K, V> record,
+                                                        final K expectedKey,
+                                                        final V expectedValue,
+                                                        final long 
expectedTimestamp) {
+        Objects.requireNonNull(record);
+        final K recordKey = record.key();
+        final V recordValue = record.value();
+        final long recordTimestamp = record.timestamp();
+        final AssertionError error = new AssertionError("Expected <" + 
expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp +
+                                                            " but was <" + 
recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp);
+        if (recordKey != null) {
+            if (!recordKey.equals(expectedKey)) {
+                throw error;
+            }
+        } else if (expectedKey != null) {
+            throw error;
+        }
+        if (recordValue != null) {
+            if (!recordValue.equals(expectedValue)) {
+                throw error;
+            }
+        } else if (expectedValue != null) {
+            throw error;
+        }
+        if (recordTimestamp != expectedTimestamp) {
+            throw error;
+        }
+    }
+
+    private static <K, V> String printRecords(final List<ConsumerRecord<K, V>> 
result) {
+        final StringBuilder resultStr = new StringBuilder();
+        resultStr.append("[\n");
+        for (final ConsumerRecord<?, ?> record : result) {
+            resultStr.append("  ").append(record.toString()).append("\n");
+        }
+        resultStr.append("]");
+        return resultStr.toString();
+    }
+
     /**
      * Returns up to `maxMessages` message-values from the topic.
      *
@@ -520,6 +635,15 @@ public static void waitUntilMetadataIsPropagated(final 
List<KafkaServer> servers
         return consumedValues;
     }
 
+    public static KafkaStreams getStartedStreams(final Properties 
streamsConfig, final StreamsBuilder builder, final boolean clean) {
+        final KafkaStreams driver = new KafkaStreams(builder.build(), 
streamsConfig);
+        if (clean) {
+            driver.cleanUp();
+        }
+        driver.start();
+        return driver;
+    }
+
     /**
      * Returns up to `maxMessages` message-values from the topic.
      *
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index bb7f49ce7a5..002ace2ed28 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -29,7 +29,9 @@
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
 import org.apache.kafka.test.MockInternalProcessorContext;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -60,13 +62,38 @@
 
     private static final Change<Long> ARBITRARY_CHANGE = new Change<>(7L, 14L);
 
+    private static class Harness<K, V> {
+        private final KTableSuppressProcessor<K, V> processor;
+        private final MockInternalProcessorContext context;
+
+
+        Harness(final Suppressed<K> suppressed,
+                final Serde<K> keySerde,
+                final Serde<V> valueSerde) {
+
+            final String storeName = "test-store";
+
+            final StateStore buffer = new 
InMemoryTimeOrderedKeyValueBuffer.Builder(storeName)
+                .withLoggingDisabled()
+                .build();
+            final KTableSuppressProcessor<K, V> processor =
+                new KTableSuppressProcessor<>(getImpl(suppressed), storeName, 
keySerde, new FullChangeSerde<>(valueSerde));
+
+            final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+            buffer.init(context, buffer);
+            processor.init(context);
+
+            this.processor = processor;
+            this.context = context;
+        }
+    }
+
     @Test
     public void zeroTimeLimitShouldImmediatelyEmit() {
-        final KTableSuppressProcessor<String, Long> processor =
-            new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, 
unbounded())), String(), new FullChangeSerde<>(Long()));
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(ZERO, unbounded()), String(), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = ARBITRARY_LONG;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -83,15 +110,10 @@ public void zeroTimeLimitShouldImmediatelyEmit() {
 
     @Test
     public void windowedZeroTimeLimitShouldImmediatelyEmit() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor =
-            new KTableSuppressProcessor<>(
-                getImpl(untilTimeLimit(ZERO, unbounded())),
-                timeWindowedSerdeFrom(String.class, 100L),
-                new FullChangeSerde<>(Long())
-            );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(untilTimeLimit(ZERO, unbounded()), 
timeWindowedSerdeFrom(String.class, 100L), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long timestamp = ARBITRARY_LONG;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -108,15 +130,10 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() {
 
     @Test
     public void intermediateSuppressionShouldBufferAndEmitLater() {
-        final KTableSuppressProcessor<String, Long> processor =
-            new KTableSuppressProcessor<>(
-                getImpl(untilTimeLimit(ofMillis(1), unbounded())),
-                String(),
-                new FullChangeSerde<>(Long())
-            );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(ofMillis(1), unbounded()), String(), 
Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = 0L;
         context.setRecordMetadata("topic", 0, 0, null, timestamp);
@@ -138,14 +155,10 @@ public void 
intermediateSuppressionShouldBufferAndEmitLater() {
 
     @Test
     public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
-            finalResults(ofMillis(1L)),
-            timeWindowedSerdeFrom(String.class, 1L),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(finalResults(ofMillis(1L)), 
timeWindowedSerdeFrom(String.class, 1L), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long windowStart = 99L;
         final long recordTime = 99L;
@@ -184,18 +197,14 @@ public void 
finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() {
     /**
      * Testing a special case of final results: that even with a grace period 
of 0,
      * it will still buffer events and emit only after the end of the window.
-     * As opposed to emitting immediately the way regular suppresion would 
with a time limit of 0.
+     * As opposed to emitting immediately the way regular suppression would 
with a time limit of 0.
      */
     @Test
     public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
-            finalResults(ofMillis(0)),
-            timeWindowedSerdeFrom(String.class, 100L),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(finalResults(ofMillis(0L)), 
timeWindowedSerdeFrom(String.class, 100L), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         // note the record is in the past, but the window end is in the 
future, so we still have to buffer,
         // even though the grace period is 0.
@@ -221,14 +230,10 @@ public void 
finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() {
 
     @Test
     public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
-            finalResults(ofMillis(0)),
-            timeWindowedSerdeFrom(String.class, 100L),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(finalResults(ofMillis(0L)), 
timeWindowedSerdeFrom(String.class, 100L), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -245,14 +250,10 @@ public void 
finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
 
     @Test
     public void finalResultsShouldSuppressTombstonesForTimeWindows() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
-            finalResults(ofMillis(0)),
-            timeWindowedSerdeFrom(String.class, 100L),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(finalResults(ofMillis(0L)), 
timeWindowedSerdeFrom(String.class, 100L), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -266,14 +267,10 @@ public void 
finalResultsShouldSuppressTombstonesForTimeWindows() {
 
     @Test
     public void finalResultsShouldSuppressTombstonesForSessionWindows() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
-            finalResults(ofMillis(0)),
-            sessionWindowedSerdeFrom(String.class),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(finalResults(ofMillis(0L)), 
sessionWindowedSerdeFrom(String.class), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -287,14 +284,10 @@ public void 
finalResultsShouldSuppressTombstonesForSessionWindows() {
 
     @Test
     public void suppressShouldNotSuppressTombstonesForTimeWindows() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))),
-            timeWindowedSerdeFrom(String.class, 100L),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), 
timeWindowedSerdeFrom(String.class, 100L), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -311,14 +304,10 @@ public void 
suppressShouldNotSuppressTombstonesForTimeWindows() {
 
     @Test
     public void suppressShouldNotSuppressTombstonesForSessionWindows() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))),
-            sessionWindowedSerdeFrom(String.class),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<Windowed<String>, Long> harness =
+            new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), 
sessionWindowedSerdeFrom(String.class), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -335,14 +324,10 @@ public void 
suppressShouldNotSuppressTombstonesForSessionWindows() {
 
     @Test
     public void suppressShouldNotSuppressTombstonesForKTable() {
-        final KTableSuppressProcessor<String, Long> processor = new 
KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))),
-            Serdes.String(),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), 
String(), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
@@ -359,14 +344,10 @@ public void 
suppressShouldNotSuppressTombstonesForKTable() {
 
     @Test
     public void suppressShouldEmitWhenOverRecordCapacity() {
-        final KTableSuppressProcessor<String, Long> processor = new 
KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(Duration.ofDays(100), maxRecords(1))),
-            Serdes.String(),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(Duration.ofDays(100), maxRecords(1)), 
String(), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setStreamTime(timestamp);
@@ -386,14 +367,10 @@ public void suppressShouldEmitWhenOverRecordCapacity() {
 
     @Test
     public void suppressShouldEmitWhenOverByteCapacity() {
-        final KTableSuppressProcessor<String, Long> processor = new 
KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(Duration.ofDays(100), maxBytes(60L))),
-            Serdes.String(),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(Duration.ofDays(100), maxBytes(60L)), 
String(), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setStreamTime(timestamp);
@@ -413,14 +390,10 @@ public void suppressShouldEmitWhenOverByteCapacity() {
 
     @Test
     public void suppressShouldShutDownWhenOverRecordCapacity() {
-        final KTableSuppressProcessor<String, Long> processor = new 
KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(Duration.ofDays(100), 
maxRecords(1).shutDownWhenFull())),
-            Serdes.String(),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(Duration.ofDays(100), 
maxRecords(1).shutDownWhenFull()), String(), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setStreamTime(timestamp);
@@ -441,14 +414,10 @@ public void 
suppressShouldShutDownWhenOverRecordCapacity() {
 
     @Test
     public void suppressShouldShutDownWhenOverByteCapacity() {
-        final KTableSuppressProcessor<String, Long> processor = new 
KTableSuppressProcessor<>(
-            getImpl(untilTimeLimit(Duration.ofDays(100), 
maxBytes(60L).shutDownWhenFull())),
-            Serdes.String(),
-            new FullChangeSerde<>(Long())
-        );
-
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
-        processor.init(context);
+        final Harness<String, Long> harness =
+            new Harness<>(untilTimeLimit(Duration.ofDays(100), 
maxBytes(60L).shutDownWhenFull()), String(), Long());
+        final MockInternalProcessorContext context = harness.context;
+        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setStreamTime(timestamp);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-328: Add in-memory Suppression
> ----------------------------------
>
>                 Key: KAFKA-7223
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7223
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]
>  
> This ticket is to implement Suppress, but only for in-memory buffers.
> (depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to