This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 3c1d85e  KAFKA-8199: Implement ValueGetter for Suppress (#6781)
3c1d85e is described below

commit 3c1d85e0de3d815b7d7081075279fc09749a0aaa
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Thu May 30 17:42:04 2019 -0500

    KAFKA-8199: Implement ValueGetter for Suppress (#6781)
    
    See also #6684
    
    KTable processors must be supplied with a KTableProcessorSupplier, which in 
turn requires implementing a ValueGetter, for use with joins and groupings.
    
    For suppression, a correct view only includes the previously emitted values 
(not the currently buffered ones), so this change also involves pushing the 
Change value type into the suppression buffer's interface, so that it can get 
the prior value upon first buffering (which is also the previously emitted 
value).
    
    Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Guozhang Wang 
<guozh...@confluent.io>
---
 .../streams/kstream/internals/FullChangeSerde.java |  19 +-
 .../streams/kstream/internals/KTableImpl.java      |  17 +-
 .../internals/KTableSourceValueGetterSupplier.java |   2 +-
 .../suppress/KTableSuppressProcessor.java          | 133 ---------
 .../suppress/KTableSuppressProcessorSupplier.java  | 214 +++++++++++++++
 .../kafka/streams/state/internals/BufferKey.java   |  72 +++++
 .../kafka/streams/state/internals/BufferValue.java |  98 +++++++
 .../streams/state/internals/ContextualRecord.java  |   6 +-
 .../InMemoryTimeOrderedKeyValueBuffer.java         | 235 ++++++++--------
 .../kafka/streams/state/internals/Maybe.java       |  89 ++++++
 .../state/internals/TimeOrderedKeyValueBuffer.java |  13 +-
 .../kstream/internals/SuppressScenarioTest.java    | 198 ++++++++++++++
 .../KTableSuppressProcessorMetricsTest.java        |  25 +-
 .../suppress/KTableSuppressProcessorTest.java      |  71 +++--
 .../kstream/internals/suppress/SuppressSuite.java  |  50 ++++
 .../kafka/streams/state/internals/MaybeTest.java   |  79 ++++++
 .../internals/TimeOrderedKeyValueBufferTest.java   | 300 +++++++++++++++++----
 17 files changed, 1265 insertions(+), 356 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index f06a428..30d55be 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -43,6 +43,10 @@ public final class FullChangeSerde<T> implements 
Serde<Change<T>> {
         this.inner = requireNonNull(inner);
     }
 
+    public Serde<T> innerSerde() {
+        return inner;
+    }
+
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
         inner.configure(configs, isKey);
@@ -110,11 +114,7 @@ public final class FullChangeSerde<T> implements 
Serde<Change<T>> {
                 }
                 final ByteBuffer buffer = ByteBuffer.wrap(data);
 
-                final int oldSize = buffer.getInt();
-                final byte[] oldBytes = oldSize == -1 ? null : new 
byte[oldSize];
-                if (oldBytes != null) {
-                    buffer.get(oldBytes);
-                }
+                final byte[] oldBytes = extractOldValuePart(buffer);
                 final T oldValue = oldBytes == null ? null : 
innerDeserializer.deserialize(topic, oldBytes);
 
                 final int newSize = buffer.getInt();
@@ -132,4 +132,13 @@ public final class FullChangeSerde<T> implements 
Serde<Change<T>> {
             }
         };
     }
+
+    public static byte[] extractOldValuePart(final ByteBuffer buffer) {
+        final int oldSize = buffer.getInt();
+        final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize];
+        if (oldBytes != null) {
+            buffer.get(oldBytes);
+        }
+        return oldBytes;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 3a4994f..666a109 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -38,7 +38,7 @@ import 
org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
 import 
org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
-import 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
+import 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier;
 import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
 import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -391,7 +391,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
     public KTable<K, V> suppress(final Suppressed<? super K> suppressed) {
         final String name;
         if (suppressed instanceof NamedSuppressed) {
-            final String givenName = ((NamedSuppressed) suppressed).name();
+            final String givenName = ((NamedSuppressed<?>) suppressed).name();
             name = givenName != null ? givenName : 
builder.newProcessorName(SUPPRESS_NAME);
         } else {
             throw new IllegalArgumentException("Custom subclasses of 
Suppressed are not supported.");
@@ -402,14 +402,17 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
         final String storeName =
             suppressedInternal.name() != null ? suppressedInternal.name() + 
"-store" : builder.newStoreName(SUPPRESS_NAME);
 
-        final ProcessorSupplier<K, Change<V>> suppressionSupplier =
-            () -> new KTableSuppressProcessor<>(suppressedInternal, storeName);
+        final ProcessorSupplier<K, Change<V>> suppressionSupplier = new 
KTableSuppressProcessorSupplier<>(
+            suppressedInternal,
+            storeName,
+            this
+        );
 
 
         final ProcessorGraphNode<K, Change<V>> node = new 
StatefulProcessorNode<>(
             name,
             new ProcessorParameters<>(suppressionSupplier, name),
-            new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, 
keySerde, FullChangeSerde.castOrWrap(valSerde))
+            new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, 
keySerde, valSerde)
         );
 
         builder.addGraphNode(streamsGraphNode, node);
@@ -624,7 +627,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
     }
 
     @SuppressWarnings("unchecked")
-    KTableValueGetterSupplier<K, V> valueGetterSupplier() {
+    public KTableValueGetterSupplier<K, V> valueGetterSupplier() {
         if (processorSupplier instanceof KTableSource) {
             final KTableSource<K, V> source = (KTableSource<K, V>) 
processorSupplier;
             // whenever a source ktable is required for getter, it should be 
materialized
@@ -638,7 +641,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
     }
 
     @SuppressWarnings("unchecked")
-    void enableSendingOldValues() {
+    public void enableSendingOldValues() {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) 
processorSupplier;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index 21731e9..7083b88 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -37,7 +37,7 @@ public class KTableSourceValueGetterSupplier<K, V> implements 
KTableValueGetterS
     }
 
     private class KTableSourceValueGetter implements KTableValueGetter<K, V> {
-        TimestampedKeyValueStore<K, V> store = null;
+        private TimestampedKeyValueStore<K, V> store = null;
 
         @SuppressWarnings("unchecked")
         public void init(final ProcessorContext context) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
deleted file mode 100644
index 7184d7a..0000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals.suppress;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.kstream.internals.Change;
-import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
-import org.apache.kafka.streams.kstream.internals.metrics.Sensors;
-import 
org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
-
-import static java.util.Objects.requireNonNull;
-
-public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
-    private final long maxRecords;
-    private final long maxBytes;
-    private final long suppressDurationMillis;
-    private final TimeDefinition<K> bufferTimeDefinition;
-    private final BufferFullStrategy bufferFullStrategy;
-    private final boolean safeToDropTombstones;
-    private final String storeName;
-
-    private TimeOrderedKeyValueBuffer<K, Change<V>> buffer;
-    private InternalProcessorContext internalProcessorContext;
-    private Sensor suppressionEmitSensor;
-    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
-
-    public KTableSuppressProcessor(final SuppressedInternal<K> suppress, final 
String storeName) {
-        this.storeName = storeName;
-        requireNonNull(suppress);
-        maxRecords = suppress.bufferConfig().maxRecords();
-        maxBytes = suppress.bufferConfig().maxBytes();
-        suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis();
-        bufferTimeDefinition = suppress.timeDefinition();
-        bufferFullStrategy = suppress.bufferConfig().bufferFullStrategy();
-        safeToDropTombstones = suppress.safeToDropTombstones();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void init(final ProcessorContext context) {
-        internalProcessorContext = (InternalProcessorContext) context;
-        suppressionEmitSensor = 
Sensors.suppressionEmitSensor(internalProcessorContext);
-
-        buffer = requireNonNull((TimeOrderedKeyValueBuffer<K, Change<V>>) 
context.getStateStore(storeName));
-        buffer.setSerdesIfNull((Serde<K>) context.keySerde(), 
FullChangeSerde.castOrWrap((Serde<V>) context.valueSerde()));
-    }
-
-    @Override
-    public void process(final K key, final Change<V> value) {
-        observedStreamTime = Math.max(observedStreamTime, 
internalProcessorContext.timestamp());
-        buffer(key, value);
-        enforceConstraints();
-    }
-
-    private void buffer(final K key, final Change<V> value) {
-        final long bufferTime = 
bufferTimeDefinition.time(internalProcessorContext, key);
-        buffer.put(bufferTime, key, value, 
internalProcessorContext.recordContext());
-    }
-
-    private void enforceConstraints() {
-        final long streamTime = observedStreamTime;
-        final long expiryTime = streamTime - suppressDurationMillis;
-
-        buffer.evictWhile(() -> buffer.minTimestamp() <= expiryTime, 
this::emit);
-
-        if (overCapacity()) {
-            switch (bufferFullStrategy) {
-                case EMIT:
-                    buffer.evictWhile(this::overCapacity, this::emit);
-                    return;
-                case SHUT_DOWN:
-                    throw new StreamsException(String.format(
-                        "%s buffer exceeded its max capacity. Currently 
[%d/%d] records and [%d/%d] bytes.",
-                        internalProcessorContext.currentNode().name(),
-                        buffer.numRecords(), maxRecords,
-                        buffer.bufferSize(), maxBytes
-                    ));
-                default:
-                    throw new UnsupportedOperationException(
-                        "The bufferFullStrategy [" + bufferFullStrategy +
-                            "] is not implemented. This is a bug in Kafka 
Streams."
-                    );
-            }
-        }
-    }
-
-    private boolean overCapacity() {
-        return buffer.numRecords() > maxRecords || buffer.bufferSize() > 
maxBytes;
-    }
-
-    private void emit(final TimeOrderedKeyValueBuffer.Eviction<K, Change<V>> 
toEmit) {
-        if (shouldForward(toEmit.value())) {
-            final ProcessorRecordContext prevRecordContext = 
internalProcessorContext.recordContext();
-            internalProcessorContext.setRecordContext(toEmit.recordContext());
-            try {
-                internalProcessorContext.forward(toEmit.key(), toEmit.value());
-                suppressionEmitSensor.record();
-            } finally {
-                internalProcessorContext.setRecordContext(prevRecordContext);
-            }
-        }
-    }
-
-    private boolean shouldForward(final Change<V> value) {
-        return value.newValue != null || !safeToDropTombstones;
-    }
-
-    @Override
-    public void close() {
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
new file mode 100644
index 0000000..58e3831
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.KTableImpl;
+import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
+import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
+import org.apache.kafka.streams.kstream.internals.metrics.Sensors;
+import 
org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.Maybe;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
+
+import static java.util.Objects.requireNonNull;
+
+public class KTableSuppressProcessorSupplier<K, V> implements 
KTableProcessorSupplier<K, V, V> {
+    private final SuppressedInternal<K> suppress;
+    private final String storeName;
+    private final KTableImpl<K, ?, V> parentKTable;
+
+    public KTableSuppressProcessorSupplier(final SuppressedInternal<K> 
suppress,
+                                           final String storeName,
+                                           final KTableImpl<K, ?, V> 
parentKTable) {
+        this.suppress = suppress;
+        this.storeName = storeName;
+        this.parentKTable = parentKTable;
+        // The suppress buffer requires seeing the old values, to support the 
prior value view.
+        parentKTable.enableSendingOldValues();
+    }
+
+    @Override
+    public Processor<K, Change<V>> get() {
+        return new KTableSuppressProcessor<>(suppress, storeName);
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V> view() {
+        final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = 
parentKTable.valueGetterSupplier();
+        return new KTableValueGetterSupplier<K, V>() {
+
+            @Override
+            public KTableValueGetter<K, V> get() {
+                final KTableValueGetter<K, V> parentGetter = 
parentValueGetterSupplier.get();
+                return new KTableValueGetter<K, V>() {
+                    private TimeOrderedKeyValueBuffer<K, V> buffer;
+
+                    @SuppressWarnings("unchecked")
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        parentGetter.init(context);
+                        // the main processor is responsible for the buffer's 
lifecycle
+                        buffer = requireNonNull((TimeOrderedKeyValueBuffer<K, 
V>) context.getStateStore(storeName));
+                    }
+
+                    @Override
+                    public ValueAndTimestamp<V> get(final K key) {
+                        final Maybe<ValueAndTimestamp<V>> maybeValue = 
buffer.priorValueForBuffered(key);
+                        if (maybeValue.isDefined()) {
+                            return maybeValue.getNullableValue();
+                        } else {
+                            // not buffered, so the suppressed view is equal 
to the parent view
+                            return parentGetter.get(key);
+                        }
+                    }
+
+                    @Override
+                    public void close() {
+                        parentGetter.close();
+                        // the main processor is responsible for the buffer's 
lifecycle
+                    }
+                };
+            }
+
+            @Override
+            public String[] storeNames() {
+                final String[] parentStores = 
parentValueGetterSupplier.storeNames();
+                final String[] stores = new String[1 + parentStores.length];
+                System.arraycopy(parentStores, 0, stores, 1, 
parentStores.length);
+                stores[0] = storeName;
+                return stores;
+            }
+        };
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        parentKTable.enableSendingOldValues();
+    }
+
+    private static final class KTableSuppressProcessor<K, V> implements 
Processor<K, Change<V>> {
+        private final long maxRecords;
+        private final long maxBytes;
+        private final long suppressDurationMillis;
+        private final TimeDefinition<K> bufferTimeDefinition;
+        private final BufferFullStrategy bufferFullStrategy;
+        private final boolean safeToDropTombstones;
+        private final String storeName;
+
+        private TimeOrderedKeyValueBuffer<K, V> buffer;
+        private InternalProcessorContext internalProcessorContext;
+        private Sensor suppressionEmitSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
+        private KTableSuppressProcessor(final SuppressedInternal<K> suppress, 
final String storeName) {
+            this.storeName = storeName;
+            requireNonNull(suppress);
+            maxRecords = suppress.bufferConfig().maxRecords();
+            maxBytes = suppress.bufferConfig().maxBytes();
+            suppressDurationMillis = 
suppress.timeToWaitForMoreEvents().toMillis();
+            bufferTimeDefinition = suppress.timeDefinition();
+            bufferFullStrategy = suppress.bufferConfig().bufferFullStrategy();
+            safeToDropTombstones = suppress.safeToDropTombstones();
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            internalProcessorContext = (InternalProcessorContext) context;
+            suppressionEmitSensor = 
Sensors.suppressionEmitSensor(internalProcessorContext);
+
+            buffer = requireNonNull((TimeOrderedKeyValueBuffer<K, V>) 
context.getStateStore(storeName));
+            buffer.setSerdesIfNull((Serde<K>) context.keySerde(), (Serde<V>) 
context.valueSerde());
+        }
+
+        @Override
+        public void process(final K key, final Change<V> value) {
+            observedStreamTime = Math.max(observedStreamTime, 
internalProcessorContext.timestamp());
+            buffer(key, value);
+            enforceConstraints();
+        }
+
+        private void buffer(final K key, final Change<V> value) {
+            final long bufferTime = 
bufferTimeDefinition.time(internalProcessorContext, key);
+
+            buffer.put(bufferTime, key, value, 
internalProcessorContext.recordContext());
+        }
+
+        private void enforceConstraints() {
+            final long streamTime = observedStreamTime;
+            final long expiryTime = streamTime - suppressDurationMillis;
+
+            buffer.evictWhile(() -> buffer.minTimestamp() <= expiryTime, 
this::emit);
+
+            if (overCapacity()) {
+                switch (bufferFullStrategy) {
+                    case EMIT:
+                        buffer.evictWhile(this::overCapacity, this::emit);
+                        return;
+                    case SHUT_DOWN:
+                        throw new StreamsException(String.format(
+                            "%s buffer exceeded its max capacity. Currently 
[%d/%d] records and [%d/%d] bytes.",
+                            internalProcessorContext.currentNode().name(),
+                            buffer.numRecords(), maxRecords,
+                            buffer.bufferSize(), maxBytes
+                        ));
+                    default:
+                        throw new UnsupportedOperationException(
+                            "The bufferFullStrategy [" + bufferFullStrategy +
+                                "] is not implemented. This is a bug in Kafka 
Streams."
+                        );
+                }
+            }
+        }
+
+        private boolean overCapacity() {
+            return buffer.numRecords() > maxRecords || buffer.bufferSize() > 
maxBytes;
+        }
+
+        private void emit(final TimeOrderedKeyValueBuffer.Eviction<K, V> 
toEmit) {
+            if (shouldForward(toEmit.value())) {
+                final ProcessorRecordContext prevRecordContext = 
internalProcessorContext.recordContext();
+                
internalProcessorContext.setRecordContext(toEmit.recordContext());
+                try {
+                    internalProcessorContext.forward(toEmit.key(), 
toEmit.value());
+                    suppressionEmitSensor.record();
+                } finally {
+                    
internalProcessorContext.setRecordContext(prevRecordContext);
+                }
+            }
+        }
+
+        private boolean shouldForward(final Change<V> value) {
+            return value.newValue != null || !safeToDropTombstones;
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferKey.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferKey.java
new file mode 100644
index 0000000..9a13aa0
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferKey.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+
+import java.util.Objects;
+
+public final class BufferKey implements Comparable<BufferKey> {
+    private final long time;
+    private final Bytes key;
+
+    BufferKey(final long time, final Bytes key) {
+        this.time = time;
+        this.key = key;
+    }
+
+    long time() {
+        return time;
+    }
+
+    Bytes key() {
+        return key;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final BufferKey bufferKey = (BufferKey) o;
+        return time == bufferKey.time &&
+            Objects.equals(key, bufferKey.key);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(time, key);
+    }
+
+    @Override
+    public int compareTo(final BufferKey o) {
+        // ordering of keys within a time uses hashCode.
+        final int timeComparison = Long.compare(time, o.time);
+        return timeComparison == 0 ? key.compareTo(o.key) : timeComparison;
+    }
+
+    @Override
+    public String toString() {
+        return "BufferKey{" +
+            "key=" + key +
+            ", time=" + time +
+            '}';
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
new file mode 100644
index 0000000..816894e
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/BufferValue.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
+
+public final class BufferValue {
+    private final ContextualRecord record;
+    private final byte[] priorValue;
+
+    BufferValue(final ContextualRecord record,
+                final byte[] priorValue) {
+        this.record = record;
+        this.priorValue = priorValue;
+    }
+
+    ContextualRecord record() {
+        return record;
+    }
+
+    byte[] priorValue() {
+        return priorValue;
+    }
+
+    static BufferValue deserialize(final ByteBuffer buffer) {
+        final ContextualRecord record = ContextualRecord.deserialize(buffer);
+
+        final int priorValueLength = buffer.getInt();
+        if (priorValueLength == -1) {
+            return new BufferValue(record, null);
+        } else {
+            final byte[] priorValue = new byte[priorValueLength];
+            buffer.get(priorValue);
+            return new BufferValue(record, priorValue);
+        }
+    }
+
+    ByteBuffer serialize(final int endPadding) {
+
+        final int sizeOfPriorValueLength = Integer.BYTES;
+        final int sizeOfPriorValue = priorValue == null ? 0 : 
priorValue.length;
+
+        final ByteBuffer buffer = record.serialize(sizeOfPriorValueLength + 
sizeOfPriorValue + endPadding);
+
+        if (priorValue == null) {
+            buffer.putInt(-1);
+        } else {
+            buffer.putInt(priorValue.length);
+            buffer.put(priorValue);
+        }
+
+        return buffer;
+    }
+
+    long sizeBytes() {
+        return (priorValue == null ? 0 : priorValue.length) + 
record.sizeBytes();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final BufferValue that = (BufferValue) o;
+        return Objects.equals(record, that.record) &&
+            Arrays.equals(priorValue, that.priorValue);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(record);
+        result = 31 * result + Arrays.hashCode(priorValue);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "BufferValue{" +
+            "record=" + record +
+            ", priorValue=" + Arrays.toString(priorValue) +
+            '}';
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
index 3893b35..3cd2c37 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
@@ -43,13 +43,13 @@ public class ContextualRecord {
         return (value == null ? 0 : value.length) + recordContext.sizeBytes();
     }
 
-    byte[] serialize() {
+    ByteBuffer serialize(final int endPadding) {
         final byte[] serializedContext = recordContext.serialize();
 
         final int sizeOfContext = serializedContext.length;
         final int sizeOfValueLength = Integer.BYTES;
         final int sizeOfValue = value == null ? 0 : value.length;
-        final ByteBuffer buffer = ByteBuffer.allocate(sizeOfContext + 
sizeOfValueLength + sizeOfValue);
+        final ByteBuffer buffer = ByteBuffer.allocate(sizeOfContext + 
sizeOfValueLength + sizeOfValue + endPadding);
 
         buffer.put(serializedContext);
         if (value == null) {
@@ -59,7 +59,7 @@ public class ContextualRecord {
             buffer.put(value);
         }
 
-        return buffer.array();
+        return buffer;
     }
 
     static ContextualRecord deserialize(final ByteBuffer buffer) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
index e11df7c..6c5022f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
@@ -25,6 +25,8 @@ import 
org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.BytesSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -32,7 +34,9 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordQueue;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.metrics.Sensors;
 
 import java.nio.ByteBuffer;
@@ -42,7 +46,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Objects;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.function.Consumer;
@@ -55,16 +59,18 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrdere
     private static final ByteArraySerializer VALUE_SERIALIZER = new 
ByteArraySerializer();
     private static final RecordHeaders V_1_CHANGELOG_HEADERS =
         new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] 
{(byte) 1})});
+    private static final RecordHeaders V_2_CHANGELOG_HEADERS =
+        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] 
{(byte) 2})});
 
     private final Map<Bytes, BufferKey> index = new HashMap<>();
-    private final TreeMap<BufferKey, ContextualRecord> sortedMap = new 
TreeMap<>();
+    private final TreeMap<BufferKey, BufferValue> sortedMap = new TreeMap<>();
 
     private final Set<Bytes> dirtyKeys = new HashSet<>();
     private final String storeName;
     private final boolean loggingEnabled;
 
     private Serde<K> keySerde;
-    private Serde<V> valueSerde;
+    private FullChangeSerde<V> valueSerde;
 
     private long memBufferSize = 0L;
     private long minTimestamp = Long.MAX_VALUE;
@@ -77,7 +83,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrdere
 
     private int partition;
 
-    public static class Builder<K, V> implements StoreBuilder<StateStore> {
+    public static class Builder<K, V> implements 
StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> {
 
         private final String storeName;
         private final Serde<K> keySerde;
@@ -94,11 +100,11 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrdere
          * As of 2.1, there's no way for users to directly interact with the 
buffer,
          * so this method is implemented solely to be called by Streams (which
          * it will do based on the {@code cache.max.bytes.buffering} config.
-         *
+         * <p>
          * It's currently a no-op.
          */
         @Override
-        public StoreBuilder<StateStore> withCachingEnabled() {
+        public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> 
withCachingEnabled() {
             return this;
         }
 
@@ -106,21 +112,21 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, 
V> implements TimeOrdere
          * As of 2.1, there's no way for users to directly interact with the 
buffer,
          * so this method is implemented solely to be called by Streams (which
          * it will do based on the {@code cache.max.bytes.buffering} config.
-         *
+         * <p>
          * It's currently a no-op.
          */
         @Override
-        public StoreBuilder<StateStore> withCachingDisabled() {
+        public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> 
withCachingDisabled() {
             return this;
         }
 
         @Override
-        public StoreBuilder<StateStore> withLoggingEnabled(final Map<String, 
String> config) {
+        public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> 
withLoggingEnabled(final Map<String, String> config) {
             throw new UnsupportedOperationException();
         }
 
         @Override
-        public StoreBuilder<StateStore> withLoggingDisabled() {
+        public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> 
withLoggingDisabled() {
             loggingEnabled = false;
             return this;
         }
@@ -146,49 +152,6 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrdere
         }
     }
 
-    private static final class BufferKey implements Comparable<BufferKey> {
-        private final long time;
-        private final Bytes key;
-
-        private BufferKey(final long time, final Bytes key) {
-            this.time = time;
-            this.key = key;
-        }
-
-        @Override
-        public boolean equals(final Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            final BufferKey bufferKey = (BufferKey) o;
-            return time == bufferKey.time &&
-                Objects.equals(key, bufferKey.key);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(time, key);
-        }
-
-        @Override
-        public int compareTo(final BufferKey o) {
-            // ordering of keys within a time uses hashCode.
-            final int timeComparison = Long.compare(time, o.time);
-            return timeComparison == 0 ? key.compareTo(o.key) : timeComparison;
-        }
-
-        @Override
-        public String toString() {
-            return "BufferKey{" +
-                "key=" + key +
-                ", time=" + time +
-                '}';
-        }
-    }
-
     private InMemoryTimeOrderedKeyValueBuffer(final String storeName,
                                               final boolean loggingEnabled,
                                               final Serde<K> keySerde,
@@ -196,7 +159,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrdere
         this.storeName = storeName;
         this.loggingEnabled = loggingEnabled;
         this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
+        this.valueSerde = FullChangeSerde.castOrWrap(valueSerde);
     }
 
     @Override
@@ -213,7 +176,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrdere
     @Override
     public void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> 
valueSerde) {
         this.keySerde = this.keySerde == null ? keySerde : this.keySerde;
-        this.valueSerde = this.valueSerde == null ? valueSerde : 
this.valueSerde;
+        this.valueSerde = this.valueSerde == null ? 
FullChangeSerde.castOrWrap(valueSerde) : this.valueSerde;
     }
 
     @Override
@@ -261,7 +224,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrdere
                     // The record was evicted from the buffer. Send a 
tombstone.
                     logTombstone(key);
                 } else {
-                    final ContextualRecord value = sortedMap.get(bufferKey);
+                    final BufferValue value = sortedMap.get(bufferKey);
 
                     logValue(key, bufferKey, value);
                 }
@@ -270,22 +233,17 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, 
V> implements TimeOrdere
         }
     }
 
-    private void logValue(final Bytes key, final BufferKey bufferKey, final 
ContextualRecord value) {
-        final byte[] serializedContextualRecord = value.serialize();
+    private void logValue(final Bytes key, final BufferKey bufferKey, final 
BufferValue value) {
 
         final int sizeOfBufferTime = Long.BYTES;
-        final int sizeOfContextualRecord = serializedContextualRecord.length;
-
-        final byte[] timeAndContextualRecord = ByteBuffer.wrap(new 
byte[sizeOfBufferTime + sizeOfContextualRecord])
-                                                         
.putLong(bufferKey.time)
-                                                         
.put(serializedContextualRecord)
-                                                         .array();
+        final ByteBuffer buffer = value.serialize(sizeOfBufferTime);
+        buffer.putLong(bufferKey.time());
 
         collector.send(
             changelogTopic,
             key,
-            timeAndContextualRecord,
-            V_1_CHANGELOG_HEADERS,
+            buffer.array(),
+            V_2_CHANGELOG_HEADERS,
             partition,
             null,
             KEY_SERIALIZER,
@@ -312,12 +270,12 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, 
V> implements TimeOrdere
                 // This was a tombstone. Delete the record.
                 final BufferKey bufferKey = index.remove(key);
                 if (bufferKey != null) {
-                    final ContextualRecord removed = 
sortedMap.remove(bufferKey);
+                    final BufferValue removed = sortedMap.remove(bufferKey);
                     if (removed != null) {
-                        memBufferSize -= computeRecordSize(bufferKey.key, 
removed);
+                        memBufferSize -= computeRecordSize(bufferKey.key(), 
removed);
                     }
-                    if (bufferKey.time == minTimestamp) {
-                        minTimestamp = sortedMap.isEmpty() ? Long.MAX_VALUE : 
sortedMap.firstKey().time;
+                    if (bufferKey.time() == minTimestamp) {
+                        minTimestamp = sortedMap.isEmpty() ? Long.MAX_VALUE : 
sortedMap.firstKey().time();
                     }
                 }
 
@@ -331,33 +289,46 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, 
V> implements TimeOrdere
                     );
                 }
             } else {
-                final ByteBuffer timeAndValue = 
ByteBuffer.wrap(record.value());
-                final long time = timeAndValue.getLong();
-                final byte[] value = new byte[record.value().length - 8];
-                timeAndValue.get(value);
                 if (record.headers().lastHeader("v") == null) {
+                    // in this case, the changelog value is just the 
serialized record value
+                    final ByteBuffer timeAndValue = 
ByteBuffer.wrap(record.value());
+                    final long time = timeAndValue.getLong();
+                    final byte[] changelogValue = new 
byte[record.value().length - 8];
+                    timeAndValue.get(changelogValue);
+
                     cleanPut(
                         time,
                         key,
-                        new ContextualRecord(
-                            value,
-                            new ProcessorRecordContext(
-                                record.timestamp(),
-                                record.offset(),
-                                record.partition(),
-                                record.topic(),
-                                record.headers()
-                            )
+                        new BufferValue(
+                            new ContextualRecord(
+                                changelogValue,
+                                new ProcessorRecordContext(
+                                    record.timestamp(),
+                                    record.offset(),
+                                    record.partition(),
+                                    record.topic(),
+                                    record.headers()
+                                )
+                            ),
+                            inferPriorValue(key, changelogValue)
                         )
                     );
                 } else if 
(V_1_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v")))
 {
-                    final ContextualRecord contextualRecord = 
ContextualRecord.deserialize(ByteBuffer.wrap(value));
-
-                    cleanPut(
-                        time,
-                        key,
-                        contextualRecord
-                    );
+                    // in this case, the changelog value is a serialized 
ContextualRecord
+                    final ByteBuffer timeAndValue = 
ByteBuffer.wrap(record.value());
+                    final long time = timeAndValue.getLong();
+                    final byte[] changelogValue = new 
byte[record.value().length - 8];
+                    timeAndValue.get(changelogValue);
+
+                    final ContextualRecord contextualRecord = 
ContextualRecord.deserialize(ByteBuffer.wrap(changelogValue));
+                    cleanPut(time, key, new BufferValue(contextualRecord, 
inferPriorValue(key, contextualRecord.value())));
+                } else if 
(V_2_CHANGELOG_HEADERS.lastHeader("v").equals(record.headers().lastHeader("v")))
 {
+                    // in this case, the changelog value is a serialized 
BufferValue
+
+                    final ByteBuffer valueAndTime = 
ByteBuffer.wrap(record.value());
+                    final BufferValue bufferValue = 
BufferValue.deserialize(valueAndTime);
+                    final long time = valueAndTime.getLong();
+                    cleanPut(time, key, bufferValue);
                 } else {
                     throw new IllegalArgumentException("Restoring apparently 
invalid changelog record: " + record);
                 }
@@ -375,42 +346,50 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, 
V> implements TimeOrdere
         updateBufferMetrics();
     }
 
+    private byte[] inferPriorValue(final Bytes key, final byte[] 
serializedChange) {
+        return index.containsKey(key)
+            ? internalPriorValueForBuffered(key)
+            : 
FullChangeSerde.extractOldValuePart(ByteBuffer.wrap(serializedChange));
+    }
+
 
     @Override
     public void evictWhile(final Supplier<Boolean> predicate,
                            final Consumer<Eviction<K, V>> callback) {
-        final Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = 
sortedMap.entrySet().iterator();
+        final Iterator<Map.Entry<BufferKey, BufferValue>> delegate = 
sortedMap.entrySet().iterator();
         int evictions = 0;
 
         if (predicate.get()) {
-            Map.Entry<BufferKey, ContextualRecord> next = null;
+            Map.Entry<BufferKey, BufferValue> next = null;
             if (delegate.hasNext()) {
                 next = delegate.next();
             }
 
             // predicate being true means we read one record, call the 
callback, and then remove it
             while (next != null && predicate.get()) {
-                if (next.getKey().time != minTimestamp) {
+                if (next.getKey().time() != minTimestamp) {
                     throw new IllegalStateException(
                         "minTimestamp [" + minTimestamp + "] did not match the 
actual min timestamp [" +
-                            next.getKey().time + "]"
+                            next.getKey().time() + "]"
                     );
                 }
-                final K key = 
keySerde.deserializer().deserialize(changelogTopic, next.getKey().key.get());
-                final V value = 
valueSerde.deserializer().deserialize(changelogTopic, next.getValue().value());
-                callback.accept(new Eviction<>(key, value, 
next.getValue().recordContext()));
+                final K key = 
keySerde.deserializer().deserialize(changelogTopic, next.getKey().key().get());
+                final BufferValue bufferValue = next.getValue();
+                final ContextualRecord record = bufferValue.record();
+                final Change<V> value = 
valueSerde.deserializer().deserialize(changelogTopic, record.value());
+                callback.accept(new Eviction<>(key, value, 
record.recordContext()));
 
                 delegate.remove();
-                index.remove(next.getKey().key);
+                index.remove(next.getKey().key());
 
-                dirtyKeys.add(next.getKey().key);
+                dirtyKeys.add(next.getKey().key());
 
-                memBufferSize -= computeRecordSize(next.getKey().key, 
next.getValue());
+                memBufferSize -= computeRecordSize(next.getKey().key(), 
bufferValue);
 
                 // peek at the next record so we can update the minTimestamp
                 if (delegate.hasNext()) {
                     next = delegate.next();
-                    minTimestamp = next == null ? Long.MAX_VALUE : 
next.getKey().time;
+                    minTimestamp = next == null ? Long.MAX_VALUE : 
next.getKey().time();
                 } else {
                     next = null;
                     minTimestamp = Long.MAX_VALUE;
@@ -425,9 +404,39 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrdere
     }
 
     @Override
+    public Maybe<ValueAndTimestamp<V>> priorValueForBuffered(final K key) {
+        final Bytes serializedKey = 
Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
+        if (index.containsKey(serializedKey)) {
+            final byte[] serializedValue = 
internalPriorValueForBuffered(serializedKey);
+
+            final V deserializedValue = 
valueSerde.innerSerde().deserializer().deserialize(
+                changelogTopic,
+                serializedValue
+            );
+
+            // it's unfortunately not possible to know this, unless we 
materialize the suppressed result, since our only
+            // knowledge of the prior value is what the upstream processor 
sends us as the "old value" when we first
+            // buffer something.
+            return Maybe.defined(ValueAndTimestamp.make(deserializedValue, 
RecordQueue.UNKNOWN));
+        } else {
+            return Maybe.undefined();
+        }
+    }
+
+    private byte[] internalPriorValueForBuffered(final Bytes key) {
+        final BufferKey bufferKey = index.get(key);
+        if (bufferKey == null) {
+            throw new NoSuchElementException("Key [" + key + "] is not in the 
buffer.");
+        } else {
+            final BufferValue bufferValue = sortedMap.get(bufferKey);
+            return bufferValue.priorValue();
+        }
+    }
+
+    @Override
     public void put(final long time,
                     final K key,
-                    final V value,
+                    final Change<V> value,
                     final ProcessorRecordContext recordContext) {
         requireNonNull(value, "value cannot be null");
         requireNonNull(recordContext, "recordContext cannot be null");
@@ -435,12 +444,26 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, 
V> implements TimeOrdere
         final Bytes serializedKey = 
Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key));
         final byte[] serializedValue = 
valueSerde.serializer().serialize(changelogTopic, value);
 
-        cleanPut(time, serializedKey, new ContextualRecord(serializedValue, 
recordContext));
+        final BufferValue buffered = getBuffered(serializedKey);
+        final byte[] serializedPriorValue;
+        if (buffered == null) {
+            final V priorValue = value.oldValue;
+            serializedPriorValue = 
valueSerde.innerSerde().serializer().serialize(changelogTopic, priorValue);
+        } else {
+            serializedPriorValue = buffered.priorValue();
+        }
+
+        cleanPut(time, serializedKey, new BufferValue(new 
ContextualRecord(serializedValue, recordContext), serializedPriorValue));
         dirtyKeys.add(serializedKey);
         updateBufferMetrics();
     }
 
-    private void cleanPut(final long time, final Bytes key, final 
ContextualRecord value) {
+    private BufferValue getBuffered(final Bytes key) {
+        final BufferKey bufferKey = index.get(key);
+        return bufferKey == null ? null : sortedMap.get(bufferKey);
+    }
+
+    private void cleanPut(final long time, final Bytes key, final BufferValue 
value) {
         // non-resetting semantics:
         // if there was a previous version of the same record,
         // then insert the new record in the same place in the priority queue
@@ -453,7 +476,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrdere
             minTimestamp = Math.min(minTimestamp, time);
             memBufferSize += computeRecordSize(key, value);
         } else {
-            final ContextualRecord removedValue = sortedMap.put(previousKey, 
value);
+            final BufferValue removedValue = sortedMap.put(previousKey, value);
             memBufferSize =
                 memBufferSize
                     + computeRecordSize(key, value)
@@ -476,7 +499,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> 
implements TimeOrdere
         return minTimestamp;
     }
 
-    private static long computeRecordSize(final Bytes key, final 
ContextualRecord value) {
+    private static long computeRecordSize(final Bytes key, final BufferValue 
value) {
         long size = 0L;
         size += 8; // buffer time
         size += key.get().length;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Maybe.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Maybe.java
new file mode 100644
index 0000000..c292c17
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Maybe.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * A container that may be empty, may contain null, or may contain a value.
+ * Distinct from {@link java.util.Optional<T>}, since Optional cannot contain 
null.
+ *
+ * @param <T>
+ */
+public final class Maybe<T> {
+    private final T nullableValue;
+    private final boolean defined;
+
+    public static <T> Maybe<T> defined(final T nullableValue) {
+        return new Maybe<>(nullableValue);
+    }
+
+    public static <T> Maybe<T> undefined() {
+        return new Maybe<>();
+    }
+
+    private Maybe(final T nullableValue) {
+        this.nullableValue = nullableValue;
+        defined = true;
+    }
+
+    private Maybe() {
+        nullableValue = null;
+        defined = false;
+    }
+
+    public T getNullableValue() {
+        if (defined) {
+            return nullableValue;
+        } else {
+            throw new NoSuchElementException();
+        }
+    }
+
+    public boolean isDefined() {
+        return defined;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final Maybe<?> maybe = (Maybe<?>) o;
+
+        // All undefined maybes are equal
+        // All defined null maybes are equal
+        return defined == maybe.defined &&
+            (!defined || Objects.equals(nullableValue, maybe.nullableValue));
+    }
+
+    @Override
+    public int hashCode() {
+        // Since all undefined maybes are equal, we can hard-code their 
hashCode to -1.
+        // Since all defined null maybes are equal, we can hard-code their 
hashCode to 0.
+        return defined ? nullableValue == null ? 0 : nullableValue.hashCode() 
: -1;
+    }
+
+    @Override
+    public String toString() {
+        if (defined) {
+            return "DefinedMaybe{" + String.valueOf(nullableValue) + "}";
+        } else {
+            return "UndefinedMaybe{}";
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
index ffa1f49..3aabbaa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
@@ -17,20 +17,23 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 
 import java.util.Objects;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore {
+
     final class Eviction<K, V> {
         private final K key;
-        private final V value;
+        private final Change<V> value;
         private final ProcessorRecordContext recordContext;
 
-        Eviction(final K key, final V value, final ProcessorRecordContext 
recordContext) {
+        Eviction(final K key, final Change<V> value, final 
ProcessorRecordContext recordContext) {
             this.key = key;
             this.value = value;
             this.recordContext = recordContext;
@@ -40,7 +43,7 @@ public interface TimeOrderedKeyValueBuffer<K, V> extends 
StateStore {
             return key;
         }
 
-        public V value() {
+        public Change<V> value() {
             return value;
         }
 
@@ -73,7 +76,9 @@ public interface TimeOrderedKeyValueBuffer<K, V> extends 
StateStore {
 
     void evictWhile(final Supplier<Boolean> predicate, final 
Consumer<Eviction<K, V>> callback);
 
-    void put(long time, K key, V value, ProcessorRecordContext recordContext);
+    Maybe<ValueAndTimestamp<V>> priorValueForBuffered(K key);
+
+    void put(long time, K key, Change<V> value, ProcessorRecordContext 
recordContext);
 
     int numRecords();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 43ad42a..3c7fd1e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -517,6 +517,204 @@ public class SuppressScenarioTest {
         }
     }
 
+    @Test
+    public void shouldWorkBeforeGroupBy() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder
+            .table("topic", Consumed.with(Serdes.String(), Serdes.String()))
+            .suppress(untilTimeLimit(ofMillis(10), unbounded()))
+            .groupBy(KeyValue::pair, Grouped.with(Serdes.String(), 
Serdes.String()))
+            .count()
+            .toStream()
+            .to("output", Produced.with(Serdes.String(), Serdes.Long()));
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), config)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(STRING_SERIALIZER, 
STRING_SERIALIZER);
+
+            driver.pipeInput(recordFactory.create("topic", "A", "a", 0L));
+            driver.pipeInput(recordFactory.create("topic", "tick", "tick", 
10L));
+
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
LONG_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("A", 1L, 0L))
+            );
+        }
+    }
+
+    @Test
+    public void shouldWorkBeforeJoinRight() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTable<String, String> left = builder
+            .table("left", Consumed.with(Serdes.String(), Serdes.String()));
+
+        final KTable<String, String> right = builder
+            .table("right", Consumed.with(Serdes.String(), Serdes.String()))
+            .suppress(untilTimeLimit(ofMillis(10), unbounded()));
+
+        left
+            .outerJoin(right, (l, r) -> String.format("(%s,%s)", l, r))
+            .toStream()
+            .to("output", Produced.with(Serdes.String(), Serdes.String()));
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), config)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(STRING_SERIALIZER, 
STRING_SERIALIZER);
+
+            driver.pipeInput(recordFactory.create("right", "B", "1", 0L));
+            driver.pipeInput(recordFactory.create("right", "A", "1", 0L));
+            // buffered, no output
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                emptyList()
+            );
+
+
+            driver.pipeInput(recordFactory.create("right", "tick", "tick", 
10L));
+            // flush buffer
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("A", "(null,1)", 0L),
+                    new KeyValueTimestamp<>("B", "(null,1)", 0L)
+                )
+            );
+
+
+            driver.pipeInput(recordFactory.create("right", "A", "2", 11L));
+            // buffered, no output
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                emptyList()
+            );
+
+
+            driver.pipeInput(recordFactory.create("left", "A", "a", 12L));
+            // should join with previously emitted right side
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("A", "(a,1)", 12L))
+            );
+
+
+            driver.pipeInput(recordFactory.create("left", "B", "b", 12L));
+            // should view through to the parent KTable, since B is no longer 
buffered
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("B", "(b,1)", 12L))
+            );
+
+
+            driver.pipeInput(recordFactory.create("left", "A", "b", 13L));
+            // should join with previously emitted right side
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("A", "(b,1)", 13L))
+            );
+
+
+            driver.pipeInput(recordFactory.create("right", "tick", "tick", 
21L));
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("tick", "(null,tick)", 21), // 
just a testing artifact
+                    new KeyValueTimestamp<>("A", "(b,2)", 13L)
+                )
+            );
+        }
+
+    }
+
+
+    @Test
+    public void shouldWorkBeforeJoinLeft() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KTable<String, String> left = builder
+            .table("left", Consumed.with(Serdes.String(), Serdes.String()))
+            .suppress(untilTimeLimit(ofMillis(10), unbounded()));
+
+        final KTable<String, String> right = builder
+            .table("right", Consumed.with(Serdes.String(), Serdes.String()));
+
+        left
+            .outerJoin(right, (l, r) -> String.format("(%s,%s)", l, r))
+            .toStream()
+            .to("output", Produced.with(Serdes.String(), Serdes.String()));
+
+        final Topology topology = builder.build();
+        System.out.println(topology.describe());
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, config)) {
+            final ConsumerRecordFactory<String, String> recordFactory =
+                new ConsumerRecordFactory<>(STRING_SERIALIZER, 
STRING_SERIALIZER);
+
+            driver.pipeInput(recordFactory.create("left", "B", "1", 0L));
+            driver.pipeInput(recordFactory.create("left", "A", "1", 0L));
+            // buffered, no output
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                emptyList()
+            );
+
+
+            driver.pipeInput(recordFactory.create("left", "tick", "tick", 
10L));
+            // flush buffer
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("A", "(1,null)", 0L),
+                    new KeyValueTimestamp<>("B", "(1,null)", 0L)
+                )
+            );
+
+
+            driver.pipeInput(recordFactory.create("left", "A", "2", 11L));
+            // buffered, no output
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                emptyList()
+            );
+
+
+            driver.pipeInput(recordFactory.create("right", "A", "a", 12L));
+            // should join with previously emitted left side
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("A", "(1,a)", 12L))
+            );
+
+
+            driver.pipeInput(recordFactory.create("right", "B", "b", 12L));
+            // should view through to the parent KTable, since B is no longer 
buffered
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("B", "(1,b)", 12L))
+            );
+
+
+            driver.pipeInput(recordFactory.create("right", "A", "b", 13L));
+            // should join with previously emitted left side
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("A", "(1,b)", 13L))
+            );
+
+
+            driver.pipeInput(recordFactory.create("left", "tick", "tick", 
21L));
+            verify(
+                drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("tick", "(tick,null)", 21), // 
just a testing artifact
+                    new KeyValueTimestamp<>("A", "(2,b)", 13L)
+                )
+            );
+        }
+
+    }
+
+
     private static <K, V> void verify(final List<ProducerRecord<K, V>> results,
                                       final List<KeyValueTimestamp<K, V>> 
expectedResults) {
         if (results.size() != expectedResults.size()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 62ae3bf..96ee735 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -22,10 +22,13 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.kstream.internals.KTableImpl;
+import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
 import org.apache.kafka.test.MockInternalProcessorContext;
+import org.easymock.EasyMock;
 import org.hamcrest.Matcher;
 import org.junit.Test;
 
@@ -141,11 +144,13 @@ public class KTableSuppressProcessorMetricsTest {
             .withLoggingDisabled()
             .build();
 
-        final KTableSuppressProcessor<String, Long> processor =
-            new KTableSuppressProcessor<>(
+        final KTableImpl<String, ?, Long> mock = 
EasyMock.mock(KTableImpl.class);
+        final Processor<String, Change<Long>> processor =
+            new KTableSuppressProcessorSupplier<>(
                 (SuppressedInternal<String>) 
Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)),
-                storeName
-            );
+                storeName,
+                mock
+            ).get();
 
         final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
         context.setCurrentNode(new ProcessorNode("testNode"));
@@ -164,9 +169,9 @@ public class KTableSuppressProcessorMetricsTest {
 
             verifyMetric(metrics, EVICTION_RATE_METRIC, is(0.0));
             verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(0.0));
-            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(25.5));
-            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(51.0));
-            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(51.0));
+            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(29.5));
+            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(59.0));
+            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(59.0));
             verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(0.5));
             verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
             verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(1.0));
@@ -180,9 +185,9 @@ public class KTableSuppressProcessorMetricsTest {
 
             verifyMetric(metrics, EVICTION_RATE_METRIC, greaterThan(0.0));
             verifyMetric(metrics, EVICTION_TOTAL_METRIC, is(1.0));
-            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(49.0));
-            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(47.0));
-            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(98.0));
+            verifyMetric(metrics, BUFFER_SIZE_AVG_METRIC, is(57.0));
+            verifyMetric(metrics, BUFFER_SIZE_CURRENT_METRIC, is(55.0));
+            verifyMetric(metrics, BUFFER_SIZE_MAX_METRIC, is(114.0));
             verifyMetric(metrics, BUFFER_COUNT_AVG_METRIC, is(1.0));
             verifyMetric(metrics, BUFFER_COUNT_CURRENT_METRIC, is(1.0));
             verifyMetric(metrics, BUFFER_COUNT_MAX_METRIC, is(2.0));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 6c10d91..d8cb858 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -26,13 +26,16 @@ import 
org.apache.kafka.streams.kstream.TimeWindowedSerializer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.kstream.internals.KTableImpl;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
 import org.apache.kafka.test.MockInternalProcessorContext;
+import org.easymock.EasyMock;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
@@ -62,7 +65,7 @@ public class KTableSuppressProcessorTest {
     private static final Change<Long> ARBITRARY_CHANGE = new Change<>(7L, 14L);
 
     private static class Harness<K, V> {
-        private final KTableSuppressProcessor<K, V> processor;
+        private final Processor<K, Change<V>> processor;
         private final MockInternalProcessorContext context;
 
 
@@ -76,8 +79,9 @@ public class KTableSuppressProcessorTest {
                 .withLoggingDisabled()
                 .build();
 
-            final KTableSuppressProcessor<K, V> processor =
-                new KTableSuppressProcessor<>((SuppressedInternal<K>) 
suppressed, storeName);
+            final KTableImpl<K, ?, V> parent = EasyMock.mock(KTableImpl.class);
+            final Processor<K, Change<V>> processor =
+                new KTableSuppressProcessorSupplier<>((SuppressedInternal<K>) 
suppressed, storeName, parent).get();
 
             final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
             context.setCurrentNode(new ProcessorNode("testNode"));
@@ -95,13 +99,12 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(ZERO, unbounded()), String(), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = ARBITRARY_LONG;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = ARBITRARY_CHANGE;
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
@@ -114,13 +117,12 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(untilTimeLimit(ZERO, unbounded()), 
timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long timestamp = ARBITRARY_LONG;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 
100L));
         final Change<Long> value = ARBITRARY_CHANGE;
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
@@ -133,17 +135,16 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(1), unbounded()), String(), 
Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = 0L;
         context.setRecordMetadata("topic", 0, 0, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, 1L);
-        processor.process(key, value);
+        harness.processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
         context.setRecordMetadata("topic", 0, 1, null, 1L);
-        processor.process("tick", new Change<>(null, null));
+        harness.processor.process("tick", new Change<>(null, null));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
@@ -156,7 +157,6 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(1L)), 
timeWindowedSerdeFrom(String.class, 1L), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long windowStart = 99L;
         final long recordTime = 99L;
@@ -164,7 +164,7 @@ public class KTableSuppressProcessorTest {
         context.setRecordMetadata("topic", 0, 0, null, recordTime);
         final Windowed<String> key = new Windowed<>("hey", new 
TimeWindow(windowStart, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
-        processor.process(key, value);
+        harness.processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
         // although the stream time is now 100, we have to wait 1 ms after the 
window *end* before we
@@ -173,7 +173,7 @@ public class KTableSuppressProcessorTest {
         final long recordTime2 = 100L;
         final long windowEnd2 = 101L;
         context.setRecordMetadata("topic", 0, 1, null, recordTime2);
-        processor.process(new Windowed<>("dummyKey1", new 
TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE);
+        harness.processor.process(new Windowed<>("dummyKey1", new 
TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE);
         assertThat(context.forwarded(), hasSize(0));
 
         // ok, now it's time to emit "hey"
@@ -181,7 +181,7 @@ public class KTableSuppressProcessorTest {
         final long recordTime3 = 101L;
         final long windowEnd3 = 102L;
         context.setRecordMetadata("topic", 0, 1, null, recordTime3);
-        processor.process(new Windowed<>("dummyKey2", new 
TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE);
+        harness.processor.process(new Windowed<>("dummyKey2", new 
TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
@@ -199,7 +199,6 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), 
timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         // note the record is in the past, but the window end is in the 
future, so we still have to buffer,
         // even though the grace period is 0.
@@ -208,11 +207,11 @@ public class KTableSuppressProcessorTest {
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 
windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
-        processor.process(key, value);
+        harness.processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
         context.setRecordMetadata("", 0, 1L, null, windowEnd);
-        processor.process(new Windowed<>("dummyKey", new TimeWindow(windowEnd, 
windowEnd + 100L)), ARBITRARY_CHANGE);
+        harness.processor.process(new Windowed<>("dummyKey", new 
TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
@@ -225,13 +224,12 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), 
timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 
100L));
         final Change<Long> value = ARBITRARY_CHANGE;
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
@@ -248,13 +246,12 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), 
timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 
100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(0));
     }
@@ -269,13 +266,12 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), 
sessionWindowedSerdeFrom(String.class), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new 
SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(0));
     }
@@ -289,13 +285,12 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), 
timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 
100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
@@ -313,13 +308,12 @@ public class KTableSuppressProcessorTest {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), 
sessionWindowedSerdeFrom(String.class), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final Windowed<String> key = new Windowed<>("hey", new 
SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
@@ -337,13 +331,12 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), 
String(), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
@@ -356,16 +349,15 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(Duration.ofDays(100), maxRecords(1)), 
String(), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
-        processor.process("dummyKey", value);
+        harness.processor.process("dummyKey", value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
@@ -378,16 +370,15 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(Duration.ofDays(100), maxBytes(60L)), 
String(), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
-        processor.process("dummyKey", value);
+        harness.processor.process("dummyKey", value);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
@@ -400,18 +391,17 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(Duration.ofDays(100), 
maxRecords(1).shutDownWhenFull()), String(), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setCurrentNode(new ProcessorNode("testNode"));
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         context.setRecordMetadata("", 0, 1L, null, timestamp);
         try {
-            processor.process("dummyKey", value);
+            harness.processor.process("dummyKey", value);
             fail("expected an exception");
         } catch (final StreamsException e) {
             assertThat(e.getMessage(), containsString("buffer exceeded its max 
capacity"));
@@ -423,18 +413,17 @@ public class KTableSuppressProcessorTest {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(Duration.ofDays(100), 
maxBytes(60L).shutDownWhenFull()), String(), Long());
         final MockInternalProcessorContext context = harness.context;
-        final KTableSuppressProcessor<String, Long> processor = 
harness.processor;
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setCurrentNode(new ProcessorNode("testNode"));
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
-        processor.process(key, value);
+        harness.processor.process(key, value);
 
         context.setRecordMetadata("", 0, 1L, null, timestamp);
         try {
-            processor.process("dummyKey", value);
+            harness.processor.process("dummyKey", value);
             fail("expected an exception");
         } catch (final StreamsException e) {
             assertThat(e.getMessage(), containsString("buffer exceeded its max 
capacity"));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java
new file mode 100644
index 0000000..3aef6d0
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressSuite.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import 
org.apache.kafka.streams.integration.SuppressionDurabilityIntegrationTest;
+import org.apache.kafka.streams.integration.SuppressionIntegrationTest;
+import org.apache.kafka.streams.kstream.SuppressedTest;
+import org.apache.kafka.streams.kstream.internals.SuppressScenarioTest;
+import org.apache.kafka.streams.kstream.internals.SuppressTopologyTest;
+import 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBufferTest;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * This suite runs all the tests related to the Suppression feature.
+ *
+ * It can be used from an IDE to selectively just run these tests when 
developing code related to Suppress.
+ * 
+ * If desired, it can also be added to a Gradle build task, although this 
isn't strictly necessary, since all
+ * these tests are already included in the `:streams:test` task.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    KTableSuppressProcessorMetricsTest.class,
+    KTableSuppressProcessorTest.class,
+    SuppressScenarioTest.class,
+    SuppressTopologyTest.class,
+    SuppressedTest.class,
+    SuppressionIntegrationTest.class,
+    SuppressionDurabilityIntegrationTest.class,
+    InMemoryTimeOrderedKeyValueBufferTest.class,
+    TimeOrderedKeyValueBufferTest.class
+})
+public class SuppressSuite {
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MaybeTest.java 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MaybeTest.java
new file mode 100644
index 0000000..7a29e13
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MaybeTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.junit.Test;
+
+import java.util.NoSuchElementException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.fail;
+
+public class MaybeTest {
+    @Test
+    public void shouldReturnDefinedValue() {
+        assertThat(Maybe.defined(null).getNullableValue(), nullValue());
+        assertThat(Maybe.defined("ASDF").getNullableValue(), is("ASDF"));
+    }
+
+    @Test
+    public void shouldAnswerIsDefined() {
+        assertThat(Maybe.defined(null).isDefined(), is(true));
+        assertThat(Maybe.defined("ASDF").isDefined(), is(true));
+        assertThat(Maybe.undefined().isDefined(), is(false));
+    }
+
+    @Test
+    public void shouldThrowOnGetUndefinedValue() {
+        final Maybe<Object> undefined = Maybe.undefined();
+        try {
+            undefined.getNullableValue();
+            fail();
+        } catch (final NoSuchElementException e) {
+            // no assertion necessary
+        }
+    }
+
+    @Test
+    public void shouldUpholdEqualityCorrectness() {
+        assertThat(Maybe.undefined().equals(Maybe.undefined()), is(true));
+        assertThat(Maybe.defined(null).equals(Maybe.defined(null)), is(true));
+        assertThat(Maybe.defined("q").equals(Maybe.defined("q")), is(true));
+
+        assertThat(Maybe.undefined().equals(Maybe.defined(null)), is(false));
+        assertThat(Maybe.undefined().equals(Maybe.defined("x")), is(false));
+
+        assertThat(Maybe.defined(null).equals(Maybe.undefined()), is(false));
+        assertThat(Maybe.defined(null).equals(Maybe.defined("x")), is(false));
+
+        assertThat(Maybe.defined("a").equals(Maybe.undefined()), is(false));
+        assertThat(Maybe.defined("a").equals(Maybe.defined(null)), is(false));
+        assertThat(Maybe.defined("a").equals(Maybe.defined("b")), is(false));
+    }
+
+    @Test
+    public void shouldUpholdHashCodeCorrectness() {
+        // This specifies the current implementation, which is simpler to 
write than an exhaustive test.
+        // As long as this implementation doesn't change, then the 
equals/hashcode contract is upheld.
+
+        assertThat(Maybe.undefined().hashCode(), is(-1));
+        assertThat(Maybe.defined(null).hashCode(), is(0));
+        assertThat(Maybe.defined("a").hashCode(), is("a".hashCode()));
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 6ae36d4..941832b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -23,12 +23,16 @@ import 
org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import 
org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer.Eviction;
 import org.apache.kafka.test.MockInternalProcessorContext;
 import org.apache.kafka.test.MockInternalProcessorContext.MockRecordCollector;
@@ -57,8 +61,8 @@ import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
 public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<String, String>> {
-    private static final RecordHeaders V_1_CHANGELOG_HEADERS =
-        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] 
{(byte) 1})});
+    private static final RecordHeaders V_2_CHANGELOG_HEADERS =
+        new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] 
{(byte) 2})});
 
     private static final String APP_ID = "test-app";
     private final Function<String, B> bufferSupplier;
@@ -129,10 +133,7 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
         try {
-            buffer.put(0, "asdf",
-                       null,
-                       getContext(0)
-            );
+            buffer.put(0, "asdf", null, getContext(0));
             fail("expected an exception");
         } catch (final NullPointerException expected) {
             // expected
@@ -163,7 +164,9 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
         final List<Eviction<String, String>> evicted = new LinkedList<>();
         buffer.evictWhile(() -> buffer.numRecords() > 1, evicted::add);
         assertThat(buffer.numRecords(), is(1));
-        assertThat(evicted, is(singletonList(new Eviction<>("asdf", "eyt", 
getContext(0L)))));
+        assertThat(evicted, is(singletonList(
+            new Eviction<>("asdf", new Change<>("eyt", null), getContext(0L))
+        )));
         cleanup(context, buffer);
     }
 
@@ -187,11 +190,11 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "23roni");
-        assertThat(buffer.bufferSize(), is(43L));
+        assertThat(buffer.bufferSize(), is(51L));
         putRecord(buffer, context, 1L, 0L, "asdf", "3l");
-        assertThat(buffer.bufferSize(), is(39L));
+        assertThat(buffer.bufferSize(), is(47L));
         putRecord(buffer, context, 0L, 0L, "zxcv", "qfowin");
-        assertThat(buffer.bufferSize(), is(82L));
+        assertThat(buffer.bufferSize(), is(98L));
         cleanup(context, buffer);
     }
 
@@ -215,12 +218,12 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
 
         putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4");
         assertThat(buffer.numRecords(), is(1));
-        assertThat(buffer.bufferSize(), is(42L));
+        assertThat(buffer.bufferSize(), is(50L));
         assertThat(buffer.minTimestamp(), is(1L));
 
         putRecord(buffer, context, 0L, 0L, "asdf", "3ng");
         assertThat(buffer.numRecords(), is(2));
-        assertThat(buffer.bufferSize(), is(82L));
+        assertThat(buffer.bufferSize(), is(98L));
         assertThat(buffer.minTimestamp(), is(0L));
 
         final AtomicInteger callbackCount = new AtomicInteger(0);
@@ -229,14 +232,14 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
                 case 1: {
                     assertThat(kv.key(), is("asdf"));
                     assertThat(buffer.numRecords(), is(2));
-                    assertThat(buffer.bufferSize(), is(82L));
+                    assertThat(buffer.bufferSize(), is(98L));
                     assertThat(buffer.minTimestamp(), is(0L));
                     break;
                 }
                 case 2: {
                     assertThat(kv.key(), is("zxcv"));
                     assertThat(buffer.numRecords(), is(1));
-                    assertThat(buffer.bufferSize(), is(42L));
+                    assertThat(buffer.bufferSize(), is(50L));
                     assertThat(buffer.minTimestamp(), is(1L));
                     break;
                 }
@@ -254,6 +257,29 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     }
 
     @Test
+    public void shouldReturnUndefinedOnPriorValueForNotBufferedKey() {
+        final TimeOrderedKeyValueBuffer<String, String> buffer = 
bufferSupplier.apply(testName);
+        final MockInternalProcessorContext context = makeContext();
+        buffer.init(context, buffer);
+
+        assertThat(buffer.priorValueForBuffered("ASDF"), 
is(Maybe.undefined()));
+    }
+
+    @Test
+    public void shouldReturnPriorValueForBufferedKey() {
+        final TimeOrderedKeyValueBuffer<String, String> buffer = 
bufferSupplier.apply(testName);
+        final MockInternalProcessorContext context = makeContext();
+        buffer.init(context, buffer);
+
+        final ProcessorRecordContext recordContext = getContext(0L);
+        context.setRecordContext(recordContext);
+        buffer.put(1L, "A", new Change<>("new-value", "old-value"), 
recordContext);
+        buffer.put(1L, "B", new Change<>("new-value", null), recordContext);
+        assertThat(buffer.priorValueForBuffered("A"), 
is(Maybe.defined(ValueAndTimestamp.make("old-value", -1))));
+        assertThat(buffer.priorValueForBuffered("B"), is(Maybe.defined(null)));
+    }
+
+    @Test
     public void shouldFlush() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = 
bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
@@ -272,19 +298,19 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
         // which we can't compare for equality using ProducerRecord.
         // As a workaround, I'm deserializing them and shoving them in a 
KeyValue, just for ease of testing.
 
-        final List<ProducerRecord<String, KeyValue<Long, ContextualRecord>>> 
collected =
+        final List<ProducerRecord<String, KeyValue<Long, BufferValue>>> 
collected =
             ((MockRecordCollector) context.recordCollector())
                 .collected()
                 .stream()
                 .map(pr -> {
-                    final KeyValue<Long, ContextualRecord> niceValue;
+                    final KeyValue<Long, BufferValue> niceValue;
                     if (pr.value() == null) {
                         niceValue = null;
                     } else {
-                        final byte[] timestampAndValue = pr.value();
-                        final ByteBuffer wrap = 
ByteBuffer.wrap(timestampAndValue);
-                        final long timestamp = wrap.getLong();
-                        final ContextualRecord contextualRecord = 
ContextualRecord.deserialize(wrap);
+                        final byte[] serializedValue = pr.value();
+                        final ByteBuffer valueBuffer = 
ByteBuffer.wrap(serializedValue);
+                        final BufferValue contextualRecord = 
BufferValue.deserialize(valueBuffer);
+                        final long timestamp = valueBuffer.getLong();
                         niceValue = new KeyValue<>(timestamp, 
contextualRecord);
                     }
 
@@ -309,15 +335,15 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
                                  0,
                                  null,
                                  "zxcv",
-                                 new KeyValue<>(1L, getRecord("3gon4i", 1)),
-                                 V_1_CHANGELOG_HEADERS
+                                 new KeyValue<>(1L, getBufferValue("3gon4i", 
1)),
+                                 V_2_CHANGELOG_HEADERS
             ),
             new ProducerRecord<>(APP_ID + "-" + testName + "-changelog",
                                  0,
                                  null,
                                  "asdf",
-                                 new KeyValue<>(2L, getRecord("2093j", 0)),
-                                 V_1_CHANGELOG_HEADERS
+                                 new KeyValue<>(2L, getBufferValue("2093j", 
0)),
+                                 V_2_CHANGELOG_HEADERS
             )
         )));
 
@@ -335,6 +361,12 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
 
         context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", 
null));
 
+        final Serializer<Change<String>> serializer = 
FullChangeSerde.castOrWrap(Serdes.String()).serializer();
+
+        final byte[] todeleteValue = serializer.serialize(null, new 
Change<>("doomed", null));
+        final byte[] asdfValue = serializer.serialize(null, new 
Change<>("qwer", null));
+        final byte[] zxcvValue1 = serializer.serialize(null, new 
Change<>("eo4im", "previous"));
+        final byte[] zxcvValue2 = serializer.serialize(null, new 
Change<>("next", "eo4im"));
         stateRestoreCallback.restoreBatch(asList(
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -345,7 +377,7 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "todelete".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + 
6).putLong(0L).put("doomed".getBytes(UTF_8)).array()),
+                                 ByteBuffer.allocate(Long.BYTES + 
todeleteValue.length).putLong(0L).put(todeleteValue).array()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  1,
@@ -355,7 +387,7 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "asdf".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + 
4).putLong(2L).put("qwer".getBytes(UTF_8)).array()),
+                                 ByteBuffer.allocate(Long.BYTES + 
asdfValue.length).putLong(2L).put(asdfValue).array()),
             new ConsumerRecord<>("changelog-topic",
                                  0,
                                  2,
@@ -365,12 +397,22 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + 
5).putLong(1L).put("3o4im".getBytes(UTF_8)).array())
+                                 ByteBuffer.allocate(Long.BYTES + 
zxcvValue1.length).putLong(1L).put(zxcvValue1).array()),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 3,
+                                 3,
+                                 TimestampType.CREATE_TIME,
+                                 -1,
+                                 -1,
+                                 -1,
+                                 "zxcv".getBytes(UTF_8),
+                                 ByteBuffer.allocate(Long.BYTES + 
zxcvValue2.length).putLong(1L).put(zxcvValue2).array())
         ));
 
         assertThat(buffer.numRecords(), is(3));
         assertThat(buffer.minTimestamp(), is(0L));
-        assertThat(buffer.bufferSize(), is(160L));
+        assertThat(buffer.bufferSize(), is(196L));
 
         stateRestoreCallback.restoreBatch(singletonList(
             new ConsumerRecord<>("changelog-topic",
@@ -387,7 +429,11 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.numRecords(), is(2));
         assertThat(buffer.minTimestamp(), is(1L));
-        assertThat(buffer.bufferSize(), is(103L));
+        assertThat(buffer.bufferSize(), is(131L));
+
+        assertThat(buffer.priorValueForBuffered("todelete"), 
is(Maybe.undefined()));
+        assertThat(buffer.priorValueForBuffered("asdf"), 
is(Maybe.defined(null)));
+        assertThat(buffer.priorValueForBuffered("zxcv"), 
is(Maybe.defined(ValueAndTimestamp.make("previous", -1))));
 
         // flush the buffer into a list in buffer order so we can make 
assertions about the contents.
 
@@ -405,11 +451,11 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
         assertThat(evicted, is(asList(
             new Eviction<>(
                 "zxcv",
-                "3o4im",
-                new ProcessorRecordContext(2L, 2, 0, "changelog-topic", new 
RecordHeaders())),
+                new Change<>("next", "eo4im"),
+                new ProcessorRecordContext(3L, 3, 0, "changelog-topic", new 
RecordHeaders())),
             new Eviction<>(
                 "asdf",
-                "qwer",
+                new Change<>("qwer", null),
                 new ProcessorRecordContext(1L, 1, 0, "changelog-topic", new 
RecordHeaders()))
         )));
 
@@ -417,7 +463,7 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     }
 
     @Test
-    public void shouldRestoreNewFormat() {
+    public void shouldRestoreV1Format() {
         final TimeOrderedKeyValueBuffer<String, String> buffer = 
bufferSupplier.apply(testName);
         final MockInternalProcessorContext context = makeContext();
         buffer.init(context, buffer);
@@ -429,9 +475,18 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
 
         final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] 
{new RecordHeader("v", new byte[] {(byte) 1})});
 
-        final byte[] todeleteValue = getRecord("doomed", 0).serialize();
-        final byte[] asdfValue = getRecord("qwer", 1).serialize();
-        final byte[] zxcvValue = getRecord("3o4im", 2).serialize();
+        final byte[] todeleteValue = getContextualRecord("doomed", 
0).serialize(0).array();
+        final byte[] asdfValue = getContextualRecord("qwer", 
1).serialize(0).array();
+        final FullChangeSerde<String> fullChangeSerde = 
FullChangeSerde.castOrWrap(Serdes.String());
+        final byte[] zxcvValue1 = new ContextualRecord(
+            fullChangeSerde.serializer().serialize(null, new Change<>("3o4im", 
"previous")),
+            getContext(2L)
+        ).serialize(0).array();
+        final FullChangeSerde<String> fullChangeSerde1 = 
FullChangeSerde.castOrWrap(Serdes.String());
+        final byte[] zxcvValue2 = new ContextualRecord(
+            fullChangeSerde1.serializer().serialize(null, new Change<>("next", 
"3o4im")),
+            getContext(3L)
+        ).serialize(0).array();
         stateRestoreCallback.restoreBatch(asList(
             new ConsumerRecord<>("changelog-topic",
                                  0,
@@ -464,13 +519,24 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
                                  -1,
                                  -1,
                                  "zxcv".getBytes(UTF_8),
-                                 ByteBuffer.allocate(Long.BYTES + 
zxcvValue.length).putLong(1L).put(zxcvValue).array(),
+                                 ByteBuffer.allocate(Long.BYTES + 
zxcvValue1.length).putLong(1L).put(zxcvValue1).array(),
+                                 v1FlagHeaders),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 3,
+                                 100,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "zxcv".getBytes(UTF_8),
+                                 ByteBuffer.allocate(Long.BYTES + 
zxcvValue2.length).putLong(1L).put(zxcvValue2).array(),
                                  v1FlagHeaders)
         ));
 
         assertThat(buffer.numRecords(), is(3));
         assertThat(buffer.minTimestamp(), is(0L));
-        assertThat(buffer.bufferSize(), is(130L));
+        assertThat(buffer.bufferSize(), is(166L));
 
         stateRestoreCallback.restoreBatch(singletonList(
             new ConsumerRecord<>("changelog-topic",
@@ -487,7 +553,11 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.numRecords(), is(2));
         assertThat(buffer.minTimestamp(), is(1L));
-        assertThat(buffer.bufferSize(), is(83L));
+        assertThat(buffer.bufferSize(), is(111L));
+
+        assertThat(buffer.priorValueForBuffered("todelete"), 
is(Maybe.undefined()));
+        assertThat(buffer.priorValueForBuffered("asdf"), 
is(Maybe.defined(null)));
+        assertThat(buffer.priorValueForBuffered("zxcv"), 
is(Maybe.defined(ValueAndTimestamp.make("previous", -1))));
 
         // flush the buffer into a list in buffer order so we can make 
assertions about the contents.
 
@@ -505,11 +575,143 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
         assertThat(evicted, is(asList(
             new Eviction<>(
                 "zxcv",
-                "3o4im",
-                getContext(2L)),
+                new Change<>("next", "3o4im"),
+                getContext(3L)),
             new Eviction<>(
                 "asdf",
-                "qwer",
+                new Change<>("qwer", null),
+                getContext(1L)
+            ))));
+
+        cleanup(context, buffer);
+    }
+
+    @Test
+    public void shouldRestoreV2Format() {
+        final TimeOrderedKeyValueBuffer<String, String> buffer = 
bufferSupplier.apply(testName);
+        final MockInternalProcessorContext context = makeContext();
+        buffer.init(context, buffer);
+
+        final RecordBatchingStateRestoreCallback stateRestoreCallback =
+            (RecordBatchingStateRestoreCallback) 
context.stateRestoreCallback(testName);
+
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", 
null));
+
+        final RecordHeaders v2FlagHeaders = new RecordHeaders(new Header[] 
{new RecordHeader("v", new byte[] {(byte) 2})});
+
+        final byte[] todeleteValue = getBufferValue("doomed", 
0).serialize(0).array();
+        final byte[] asdfValue = getBufferValue("qwer", 
1).serialize(0).array();
+        final FullChangeSerde<String> fullChangeSerde = 
FullChangeSerde.castOrWrap(Serdes.String());
+        final byte[] zxcvValue1 =
+            new BufferValue(
+                new ContextualRecord(
+                    fullChangeSerde.serializer().serialize(null, new 
Change<>("3o4im", "IGNORED")),
+                    getContext(2L)
+                ),
+                Serdes.String().serializer().serialize(null, "previous")
+            ).serialize(0).array();
+        final FullChangeSerde<String> fullChangeSerde1 = 
FullChangeSerde.castOrWrap(Serdes.String());
+        final byte[] zxcvValue2 =
+            new BufferValue(
+                new ContextualRecord(
+                    fullChangeSerde1.serializer().serialize(null, new 
Change<>("next", "3o4im")),
+                    getContext(3L)
+                ),
+                Serdes.String().serializer().serialize(null, "previous")
+            ).serialize(0).array();
+        stateRestoreCallback.restoreBatch(asList(
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 0,
+                                 999,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "todelete".getBytes(UTF_8),
+                                 ByteBuffer.allocate(Long.BYTES + 
todeleteValue.length).put(todeleteValue).putLong(0L).array(),
+                                 v2FlagHeaders),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 1,
+                                 9999,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "asdf".getBytes(UTF_8),
+                                 ByteBuffer.allocate(Long.BYTES + 
asdfValue.length).put(asdfValue).putLong(2L).array(),
+                                 v2FlagHeaders),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 2,
+                                 99,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "zxcv".getBytes(UTF_8),
+                                 ByteBuffer.allocate(Long.BYTES + 
zxcvValue1.length).put(zxcvValue1).putLong(1L).array(),
+                                 v2FlagHeaders),
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 2,
+                                 100,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "zxcv".getBytes(UTF_8),
+                                 ByteBuffer.allocate(Long.BYTES + 
zxcvValue2.length).put(zxcvValue2).putLong(1L).array(),
+                                 v2FlagHeaders)
+        ));
+
+        assertThat(buffer.numRecords(), is(3));
+        assertThat(buffer.minTimestamp(), is(0L));
+        assertThat(buffer.bufferSize(), is(166L));
+
+        stateRestoreCallback.restoreBatch(singletonList(
+            new ConsumerRecord<>("changelog-topic",
+                                 0,
+                                 3,
+                                 3,
+                                 TimestampType.CREATE_TIME,
+                                 -1L,
+                                 -1,
+                                 -1,
+                                 "todelete".getBytes(UTF_8),
+                                 null)
+        ));
+
+        assertThat(buffer.numRecords(), is(2));
+        assertThat(buffer.minTimestamp(), is(1L));
+        assertThat(buffer.bufferSize(), is(111L));
+
+        assertThat(buffer.priorValueForBuffered("todelete"), 
is(Maybe.undefined()));
+        assertThat(buffer.priorValueForBuffered("asdf"), 
is(Maybe.defined(null)));
+        assertThat(buffer.priorValueForBuffered("zxcv"), 
is(Maybe.defined(ValueAndTimestamp.make("previous", -1))));
+
+        // flush the buffer into a list in buffer order so we can make 
assertions about the contents.
+
+        final List<Eviction<String, String>> evicted = new LinkedList<>();
+        buffer.evictWhile(() -> true, evicted::add);
+
+        // Several things to note:
+        // * The buffered records are ordered according to their buffer time 
(serialized in the value of the changelog)
+        // * The record timestamps are properly restored, and not conflated 
with the record's buffer time.
+        // * The keys and values are properly restored
+        // * The record topic is set to the original input topic, *not* the 
changelog topic
+        // * The record offset preserves the original input record's offset, 
*not* the offset of the changelog record
+
+
+        assertThat(evicted, is(asList(
+            new Eviction<>(
+                "zxcv",
+                new Change<>("next", "3o4im"),
+                getContext(3L)),
+            new Eviction<>(
+                "asdf",
+                new Change<>("qwer", null),
                 getContext(1L)
             ))));
 
@@ -529,7 +731,7 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
 
         final RecordHeaders unknownFlagHeaders = new RecordHeaders(new 
Header[] {new RecordHeader("v", new byte[] {(byte) -1})});
 
-        final byte[] todeleteValue = getRecord("doomed", 0).serialize();
+        final byte[] todeleteValue = getBufferValue("doomed", 
0).serialize(0).array();
         try {
             stateRestoreCallback.restoreBatch(singletonList(
                 new ConsumerRecord<>("changelog-topic",
@@ -560,12 +762,18 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
                                   final String value) {
         final ProcessorRecordContext recordContext = 
getContext(recordTimestamp);
         context.setRecordContext(recordContext);
-        buffer.put(streamTime, key, value, recordContext);
+        buffer.put(streamTime, key, new Change<>(value, null), recordContext);
+    }
+
+    private static BufferValue getBufferValue(final String value, final long 
timestamp) {
+        final ContextualRecord contextualRecord = getContextualRecord(value, 
timestamp);
+        return new BufferValue(contextualRecord, null);
     }
 
-    private static ContextualRecord getRecord(final String value, final long 
timestamp) {
+    private static ContextualRecord getContextualRecord(final String value, 
final long timestamp) {
+        final FullChangeSerde<String> fullChangeSerde = 
FullChangeSerde.castOrWrap(Serdes.String());
         return new ContextualRecord(
-            value.getBytes(UTF_8),
+            fullChangeSerde.serializer().serialize(null, new Change<>(value, 
null)),
             getContext(timestamp)
         );
     }

Reply via email to