mjsax commented on code in PR #21580:
URL: https://github.com/apache/kafka/pull/21580#discussion_r2943207276


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java:
##########
@@ -109,16 +111,18 @@ private VOut computeValue(final KIn key, final VIn value) 
{
         return newValue;
     }
 
-    private ValueAndTimestamp<VOut> computeValueAndTimestamp(final KIn key, 
final ValueAndTimestamp<VIn> valueAndTimestamp) {
+    private ValueTimestampHeaders<VOut> computeValueAndTimestamp(final KIn 
key, final ValueTimestampHeaders<VIn> valueTimestampHeaders) {
         VOut newValue = null;
         long timestamp = 0;
+        Headers headers = null;
 
-        if (valueAndTimestamp != null) {
-            newValue = mapper.apply(key, valueAndTimestamp.value());
-            timestamp = valueAndTimestamp.timestamp();
+        if (valueTimestampHeaders != null) {
+            newValue = mapper.apply(key, valueTimestampHeaders.value());
+            timestamp = valueTimestampHeaders.timestamp();
+            headers = valueTimestampHeaders.headers();
         }

Review Comment:
   If `valueTimestampHeaders == null`, should we set `headers = 
context.headers()` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -45,70 +49,72 @@ public class KeyValueStoreWrapper<K, V> implements 
StateStore {
     public static final long PUT_RETURN_CODE_IS_LATEST
         = VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
 
-    private TimestampedKeyValueStore<K, V> timestampedStore = null;
+    private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
     private VersionedKeyValueStore<K, V> versionedStore = null;
 
     // same as either timestampedStore or versionedStore above. kept merely as 
a convenience
     // to simplify implementation for methods which do not depend on store 
type.
     private StateStore store;
 
+    @SuppressWarnings("unchecked")
     public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final 
String storeName) {
+        final StateStore rawStore = context.getStateStore(storeName);

Review Comment:
   Why do we introduce `rawStore` ? The leads to the new to add 
`@SuppressWarnings("unchecked")` and cast below.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -45,70 +49,72 @@ public class KeyValueStoreWrapper<K, V> implements 
StateStore {
     public static final long PUT_RETURN_CODE_IS_LATEST
         = VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
 
-    private TimestampedKeyValueStore<K, V> timestampedStore = null;
+    private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
     private VersionedKeyValueStore<K, V> versionedStore = null;
 
     // same as either timestampedStore or versionedStore above. kept merely as 
a convenience
     // to simplify implementation for methods which do not depend on store 
type.
     private StateStore store;
 
+    @SuppressWarnings("unchecked")
     public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final 
String storeName) {
+        final StateStore rawStore = context.getStateStore(storeName);
+
+        // Try headers-aware timestamped store
         try {
-            // first try timestamped store
-            timestampedStore = context.getStateStore(storeName);
-            store = timestampedStore;
+            headersStore = (TimestampedKeyValueStoreWithHeaders<K, V>) 
rawStore;
+            store = headersStore;
             return;
         } catch (final ClassCastException e) {
-            // ignore since could be versioned store instead
+            // not headers store, try versioned
         }
 
+        // Try versioned store
         try {
-            // next try versioned store
-            versionedStore = context.getStateStore(storeName);
+            versionedStore = (VersionedKeyValueStore<K, V>) rawStore;
             store = versionedStore;
         } catch (final ClassCastException e) {
-            store = context.getStateStore(storeName);
-            final String storeType = store == null ? "null" : 
store.getClass().getName();
+            final String storeType = rawStore == null ? "null" : 
rawStore.getClass().getName();
             throw new InvalidStateStoreException("KTable source state store 
must implement either "
-                + "TimestampedKeyValueStore or VersionedKeyValueStore. Got: " 
+ storeType);
+                + "TimestampedKeyValueStore, 
TimestampedKeyValueStoreWithHeaders, or VersionedKeyValueStore. Got: " + 
storeType);

Review Comment:
   ```suggestion
                   + "TimestampedKeyValueStoreWithHeaders or 
VersionedKeyValueStore. Got: " + storeType);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -45,70 +49,72 @@ public class KeyValueStoreWrapper<K, V> implements 
StateStore {
     public static final long PUT_RETURN_CODE_IS_LATEST
         = VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
 
-    private TimestampedKeyValueStore<K, V> timestampedStore = null;
+    private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
     private VersionedKeyValueStore<K, V> versionedStore = null;
 
     // same as either timestampedStore or versionedStore above. kept merely as 
a convenience
     // to simplify implementation for methods which do not depend on store 
type.
     private StateStore store;
 
+    @SuppressWarnings("unchecked")
     public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final 
String storeName) {
+        final StateStore rawStore = context.getStateStore(storeName);
+
+        // Try headers-aware timestamped store
         try {
-            // first try timestamped store
-            timestampedStore = context.getStateStore(storeName);
-            store = timestampedStore;
+            headersStore = (TimestampedKeyValueStoreWithHeaders<K, V>) 
rawStore;
+            store = headersStore;
             return;
         } catch (final ClassCastException e) {
-            // ignore since could be versioned store instead
+            // not headers store, try versioned
         }
 
+        // Try versioned store
         try {
-            // next try versioned store
-            versionedStore = context.getStateStore(storeName);
+            versionedStore = (VersionedKeyValueStore<K, V>) rawStore;
             store = versionedStore;
         } catch (final ClassCastException e) {
-            store = context.getStateStore(storeName);
-            final String storeType = store == null ? "null" : 
store.getClass().getName();
+            final String storeType = rawStore == null ? "null" : 
rawStore.getClass().getName();
             throw new InvalidStateStoreException("KTable source state store 
must implement either "
-                + "TimestampedKeyValueStore or VersionedKeyValueStore. Got: " 
+ storeType);
+                + "TimestampedKeyValueStore, 
TimestampedKeyValueStoreWithHeaders, or VersionedKeyValueStore. Got: " + 
storeType);
         }
     }
 
-    public ValueAndTimestamp<V> get(final K key) {
-        if (timestampedStore != null) {
-            return timestampedStore.get(key);
+    public ValueTimestampHeaders<V> get(final K key) {
+        if (headersStore != null) {
+            return headersStore.get(key);
         }
         if (versionedStore != null) {
             final VersionedRecord<V> versionedRecord = versionedStore.get(key);
             return versionedRecord == null
                 ? null
-                : ValueAndTimestamp.make(versionedRecord.value(), 
versionedRecord.timestamp());
+                : ValueTimestampHeaders.make(versionedRecord.value(), 
versionedRecord.timestamp(), null);
         }
-        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped or versioned store");
+        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped, headers, or versioned store");

Review Comment:
   ```suggestion
           throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either headers or versioned store");
   ```



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java:
##########
@@ -61,8 +61,8 @@ public void init(final ProcessorContext<?, ?> context) {
                 }
 
                 @Override
-                public ValueAndTimestamp<V> get(final K key) {
-                    return ValueAndTimestamp.make(map.get(key), -1);
+                public ValueTimestampHeaders<V> get(final K key) {
+                    return ValueTimestampHeaders.make(map.get(key), -1, null);

Review Comment:
   Nit: Should we align to prod-code and pass `new RecordHeaders()` instead of 
`null`?
   
   Also elsewhere, if we change it -- I am fine either way.



##########
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java:
##########


Review Comment:
   It seems, we should pass in `Stores.persistentWindowStoreWithHeaders` here 
-- we change the DSL to work with header-store, so if we pass in a plain-store, 
the code inserts the adaptor?
   
   Given that `Stores.persistentWindowStoreWithHeaders(...)` doesn't exists yet 
(seems this is something we should get fixed?), we could also just pass `new 
RocksDbWindowBytesStoreSupplier(...)` instead?



##########
streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
+import 
org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * This class is a generic version of the in-memory key-value store that is 
useful for testing when you
+ *  need a basic KeyValueStore for arbitrary types and don't have/want to 
write a serde
+ */
+@SuppressWarnings("deprecation")
+public class GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends 
Comparable, V>
+    extends WrappedStateStore<StateStore, K, ValueTimestampHeaders<V>>
+    implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+    private final String name;
+    private final NavigableMap<K, ValueTimestampHeaders<V>> map;
+    private volatile boolean open = false;
+
+    public GenericInMemoryTimestampedKeyValueStoreWithHeaders(final String 
name) {
+        // it's not really a `WrappedStateStore` so we pass `null`
+        // however, we need to implement `WrappedStateStore` to make the store 
usable
+        super(null);
+        this.name = name;
+
+        this.map = new TreeMap<>();
+    }
+
+    @Override
+    public String name() {
+        return this.name;

Review Comment:
   ```suggestion
           return name;
   ```



##########
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java:
##########
@@ -1222,7 +1222,7 @@ public void 
shouldUseSpecifiedStoreSupplierForEachOuterJoinOperationBetweenKStre
         final ProcessorTopology topology = 
builder.internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(props)).buildTopology();
         assertTypesForStateStore(topology.stateStores(),
                 InMemoryWindowStore.class,
-                RocksDBWindowStore.class,
+                PlainToHeadersWindowStoreAdapter.class,

Review Comment:
   > We can either fix the test 
   
   Not sure what you exactly mean by this. But cf my comment above -- this is 
expected given how the test is setup -- but yes, we might want to update the 
test setup.
   
   I don't think we need to change anything about 
`PlainToHeadersWindowStoreAdapter`.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyTimestampedWindowStoreWithHeadersFacade.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+
+/**
+ * A facade that wraps {@link TimestampedWindowStoreWithHeaders} to provide a
+ * {@link ReadOnlyWindowStore} interface with {@link ValueAndTimestamp} values.
+ * This facade converts {@link ValueTimestampHeaders} to {@link 
ValueAndTimestamp}, discarding headers.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class ReadOnlyTimestampedWindowStoreWithHeadersFacade<K, V> implements 
ReadOnlyWindowStore<K, ValueAndTimestamp<V>> {

Review Comment:
   One more about `GenericXxx` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyTimestampedKeyValueStoreWithHeadersFacade.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+/**
+ * A facade that wraps {@link TimestampedKeyValueStoreWithHeaders} to provide a
+ * {@link ReadOnlyKeyValueStore} interface with {@link ValueAndTimestamp} 
values.
+ * This facade converts {@link ValueTimestampHeaders} to {@link 
ValueAndTimestamp}, discarding headers.
+ *
+ * Similar to {@link ReadOnlyKeyValueStoreWithHeadersFacade} but for 
timestamped queries.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class ReadOnlyTimestampedKeyValueStoreWithHeadersFacade<K, V> 
implements ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> {

Review Comment:
   Similar question. Can we use `GenericXxx` to reduce boiler place code 
duplication?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreWithHeadersFacade.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+/**
+ * A facade that wraps {@link TimestampedKeyValueStoreWithHeaders} to provide a
+ * {@link ReadOnlyKeyValueStore} interface. This facade extracts the plain 
value
+ * from {@link ValueTimestampHeaders}, discarding timestamp and headers.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class ReadOnlyKeyValueStoreWithHeadersFacade<K, V> implements 
ReadOnlyKeyValueStore<K, V> {

Review Comment:
   Should this be `ReadOnlyKeyValueStoreWithHeadersFacade extends 
GenericReadOnlyKeyValueStoreFacade` or maybe we can drop the class entirely and 
just use `new GenericReadOnlyKeyValueStoreFacade(...)` (similar to what we did 
for IQv1 and TTD case)?
   
   We could do this also as follow up PR.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreWithHeadersFacade.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+
+/**
+ * A facade that wraps {@link TimestampedWindowStoreWithHeaders} to provide a
+ * {@link ReadOnlyWindowStore} interface with plain values.
+ * This facade converts {@link ValueTimestampHeaders} to plain values, 
discarding both timestamps and headers.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class ReadOnlyWindowStoreWithHeadersFacade<K, V> implements 
ReadOnlyWindowStore<K, V> {

Review Comment:
   Another facede



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -45,70 +49,72 @@ public class KeyValueStoreWrapper<K, V> implements 
StateStore {
     public static final long PUT_RETURN_CODE_IS_LATEST
         = VersionedKeyValueStore.PUT_RETURN_CODE_VALID_TO_UNDEFINED;
 
-    private TimestampedKeyValueStore<K, V> timestampedStore = null;
+    private TimestampedKeyValueStoreWithHeaders<K, V> headersStore = null;
     private VersionedKeyValueStore<K, V> versionedStore = null;
 
     // same as either timestampedStore or versionedStore above. kept merely as 
a convenience
     // to simplify implementation for methods which do not depend on store 
type.
     private StateStore store;
 
+    @SuppressWarnings("unchecked")
     public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final 
String storeName) {
+        final StateStore rawStore = context.getStateStore(storeName);
+
+        // Try headers-aware timestamped store
         try {
-            // first try timestamped store
-            timestampedStore = context.getStateStore(storeName);
-            store = timestampedStore;
+            headersStore = (TimestampedKeyValueStoreWithHeaders<K, V>) 
rawStore;
+            store = headersStore;
             return;
         } catch (final ClassCastException e) {
-            // ignore since could be versioned store instead
+            // not headers store, try versioned
         }
 
+        // Try versioned store
         try {
-            // next try versioned store
-            versionedStore = context.getStateStore(storeName);
+            versionedStore = (VersionedKeyValueStore<K, V>) rawStore;
             store = versionedStore;
         } catch (final ClassCastException e) {
-            store = context.getStateStore(storeName);
-            final String storeType = store == null ? "null" : 
store.getClass().getName();
+            final String storeType = rawStore == null ? "null" : 
rawStore.getClass().getName();
             throw new InvalidStateStoreException("KTable source state store 
must implement either "
-                + "TimestampedKeyValueStore or VersionedKeyValueStore. Got: " 
+ storeType);
+                + "TimestampedKeyValueStore, 
TimestampedKeyValueStoreWithHeaders, or VersionedKeyValueStore. Got: " + 
storeType);
         }
     }
 
-    public ValueAndTimestamp<V> get(final K key) {
-        if (timestampedStore != null) {
-            return timestampedStore.get(key);
+    public ValueTimestampHeaders<V> get(final K key) {
+        if (headersStore != null) {
+            return headersStore.get(key);
         }
         if (versionedStore != null) {
             final VersionedRecord<V> versionedRecord = versionedStore.get(key);
             return versionedRecord == null
                 ? null
-                : ValueAndTimestamp.make(versionedRecord.value(), 
versionedRecord.timestamp());
+                : ValueTimestampHeaders.make(versionedRecord.value(), 
versionedRecord.timestamp(), null);
         }
-        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped or versioned store");
+        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped, headers, or versioned store");
     }
 
-    public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
+    public ValueTimestampHeaders<V> get(final K key, final long asOfTimestamp) 
{
         if (!isVersionedStore()) {
             throw new UnsupportedOperationException("get(key, timestamp) is 
only supported for versioned stores");
         }
         final VersionedRecord<V> versionedRecord = versionedStore.get(key, 
asOfTimestamp);
-        return versionedRecord == null ? null : 
ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp());
+        return versionedRecord == null ? null : 
ValueTimestampHeaders.make(versionedRecord.value(), 
versionedRecord.timestamp(), new RecordHeaders());
     }
 
     /**
      * @return {@code -1} if the put record is the latest for its key, and 
{@code Long.MIN_VALUE}
      *         if the put was rejected (i.e., due to grace period having 
elapsed for a versioned
      *         store). If neither, any other long value may be returned.
      */
-    public long put(final K key, final V value, final long timestamp) {
-        if (timestampedStore != null) {
-            timestampedStore.put(key, ValueAndTimestamp.make(value, 
timestamp));
+    public long put(final K key, final V value, final long timestamp, final 
Headers headers) {
+        if (headersStore != null) {
+            headersStore.put(key, ValueTimestampHeaders.make(value, timestamp, 
headers));
             return PUT_RETURN_CODE_IS_LATEST;
         }
         if (versionedStore != null) {
             return versionedStore.put(key, value, timestamp);
         }
-        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped or versioned store");
+        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped, headers, or versioned store");

Review Comment:
   ```suggestion
           throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either headers or versioned store");
   ```



##########
streams/src/test/java/org/apache/kafka/streams/KeyValueTimestampHeaders.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.header.Headers;
+
+import java.util.Objects;
+
+/**
+ * A key-value pair with timestamp and headers, used for testing KTable with 
headers support.
+ */
+public class KeyValueTimestampHeaders<K, V> {

Review Comment:
   Wondering why we add this? Can't we use `Record` ? (might be another follow 
up PR to clean it up)



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java:
##########
@@ -105,297 +101,146 @@ private void mockWindowStoreSupplier() {
     }
 
     @Test
-    public void 
shouldCreateTimestampedBuilderWithCachingAndLoggingEnabledByDefault() {
+    public void 
shouldCreateHeadersBuilderWithCachingAndLoggingEnabledByDefault() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
         final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         final StateStore logging = caching.wrapped();
 
-        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
         assertInstanceOf(CachingWindowStore.class, caching);
-        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
+        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
     @Test
-    public void shouldCreateTimestampedBuilderWithCachingDisabled() {
+    public void shouldCreateHeadersBuilderWithCachingDisabled() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
             Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withCachingDisabled(), nameProvider, STORE_PREFIX
         );
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
-        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
+        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
     @Test
-    public void shouldCreateTimestampedBuilderWithLoggingDisabled() {
+    public void shouldCreateHeadersBuilderWithLoggingDisabled() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
             Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
         );
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         assertInstanceOf(CachingWindowStore.class, caching);
-        assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStore);
+        assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
     }
 
     @Test
-    public void shouldCreateTimestampedBuilderWithCachingAndLoggingDisabled() {
+    public void shouldCreateHeadersBuilderWithCachingAndLoggingDisabled() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
             Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider, 
STORE_PREFIX
         );
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final StateStore wrapped = ((WrappedStateStore) store).wrapped();
         assertFalse(wrapped instanceof CachingWindowStore);
-        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStore);
+        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
     }
 
     @Test
-    public void 
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
 {
+    public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
 {
         mockWindowStoreSupplier();
 
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.as(windowStoreSupplier), 
nameProvider, STORE_PREFIX);
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         final StateStore logging = caching.wrapped();
         assertEquals(innerWindowStore.name(), store.name());
-        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
         assertInstanceOf(CachingWindowStore.class, caching);
-        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
+        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
     @Test
-    public void 
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingDisabled() {
+    public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingDisabled() {
         mockWindowStoreSupplier();
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider, 
STORE_PREFIX);
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         assertEquals(innerWindowStore.name(), store.name());
-        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
+        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
     @Test
-    public void 
shouldCreateTimestampedStoreWithProvidedSupplierAndLoggingDisabled() {
+    public void 
shouldCreateHeadersStoreWithProvidedSupplierAndLoggingDisabled() {
         mockWindowStoreSupplier();
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withLoggingDisabled(), nameProvider, 
STORE_PREFIX);
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         assertEquals(innerWindowStore.name(), store.name());
         assertInstanceOf(CachingWindowStore.class, caching);
-        assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStore);
+        assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
     }
 
     @Test
-    public void 
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
+    public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
         mockWindowStoreSupplier();
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withCachingDisabled().withLoggingDisabled(), 
nameProvider, STORE_PREFIX);
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final StateStore wrapped = ((WrappedStateStore) store).wrapped();
         assertEquals(innerWindowStore.name(), store.name());
         assertFalse(wrapped instanceof CachingWindowStore);
-        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStore);
-    }
-
-    @Test
-    public void shouldCreateHeadersAwareStoreWithLoggingEnabledByDefault() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.<String, String, 
WindowStore<Bytes, byte[]>>as("store")
-                .withCachingDisabled(), nameProvider, STORE_PREFIX);
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
-
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
-        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
-        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
-    }
-
-    @Test
-    public void shouldCreateHeadersAwareStoreWithLoggingDisabled() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
-            Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
-        );
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
-
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStore);
-    }
-
-    @Test
-    public void shouldBuildHeadersAwareStoreWithCachingEnabledByDefault() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
-
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertInstanceOf(CachingWindowStore.class, wrapped);
-    }
-
-    @Test
-    public void 
shouldCreateHeadersAwareStoreWithProvidedSupplierAndLoggingEnabled() {
-        mockWindowStoreSupplier();
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider, 
STORE_PREFIX);
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
-
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
-        assertEquals(innerWindowStore.name(), store.name());
-        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
-        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
-    }
-
-    @Test
-    public void shouldCreateHeadersAwareStoreWithCachingAndLoggingDisabled() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
-            Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider, 
STORE_PREFIX
-        );
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
-
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertFalse(wrapped instanceof CachingWindowStore);
         assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
     }
 
     @Test
-    public void shouldCreateTimestampedStoreWithOnWindowClose() {
+    public void shouldCreateHeadersStoreWithOnWindowClose() {
         emitStrategy = EmitStrategy.onWindowClose();
 
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.<String, String, 
WindowStore<Bytes, byte[]>>as("store")
                 .withCachingDisabled(), nameProvider, STORE_PREFIX);
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
-
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
-        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
-        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
-    }
-
-    @Test
-    public void 
shouldCreateTimestampedStoreWithOnWindowCloseAndCachingEnabled() {
-        emitStrategy = EmitStrategy.onWindowClose();
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
-
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
-
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
-    }
-
-    @Test
-    public void shouldCreateHeadersAwareStoreWithOnWindowClose() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-        emitStrategy = EmitStrategy.onWindowClose();
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.<String, String, 
WindowStore<Bytes, byte[]>>as("store")
-                .withCachingDisabled(), nameProvider, STORE_PREFIX);
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
         
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
     @Test
-    public void 
shouldCreateHeadersAwareStoreWithOnWindowCloseAndLoggingDisabled() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-        emitStrategy = EmitStrategy.onWindowClose();
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
-            Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
-        );
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
-
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
-    }
-
-    @Test
-    public void 
shouldCreateHeadersAwareStoreWithOnWindowCloseAndCachingEnabled() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+    public void shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled() {
         emitStrategy = EmitStrategy.onWindowClose();
 
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
 
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);

Review Comment:
   Btw: Thanks for highlighting this change -- I would have missed it most 
likely during review w/o your comment.



##########
streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
+import 
org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * This class is a generic version of the in-memory key-value store that is 
useful for testing when you
+ *  need a basic KeyValueStore for arbitrary types and don't have/want to 
write a serde
+ */
+@SuppressWarnings("deprecation")
+public class GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends 
Comparable, V>
+    extends WrappedStateStore<StateStore, K, ValueTimestampHeaders<V>>
+    implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+    private final String name;
+    private final NavigableMap<K, ValueTimestampHeaders<V>> map;
+    private volatile boolean open = false;
+
+    public GenericInMemoryTimestampedKeyValueStoreWithHeaders(final String 
name) {
+        // it's not really a `WrappedStateStore` so we pass `null`
+        // however, we need to implement `WrappedStateStore` to make the store 
usable
+        super(null);
+        this.name = name;
+
+        this.map = new TreeMap<>();
+    }
+
+    @Override
+    public String name() {
+        return this.name;

Review Comment:
   nit: avoid unnecessary `this.` (similar below)



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java:
##########
@@ -105,297 +101,146 @@ private void mockWindowStoreSupplier() {
     }
 
     @Test
-    public void 
shouldCreateTimestampedBuilderWithCachingAndLoggingEnabledByDefault() {
+    public void 
shouldCreateHeadersBuilderWithCachingAndLoggingEnabledByDefault() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
         final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         final StateStore logging = caching.wrapped();
 
-        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
         assertInstanceOf(CachingWindowStore.class, caching);
-        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
+        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
     @Test
-    public void shouldCreateTimestampedBuilderWithCachingDisabled() {
+    public void shouldCreateHeadersBuilderWithCachingDisabled() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
             Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withCachingDisabled(), nameProvider, STORE_PREFIX
         );
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
-        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
+        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
     @Test
-    public void shouldCreateTimestampedBuilderWithLoggingDisabled() {
+    public void shouldCreateHeadersBuilderWithLoggingDisabled() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
             Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
         );
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         assertInstanceOf(CachingWindowStore.class, caching);
-        assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStore);
+        assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
     }
 
     @Test
-    public void shouldCreateTimestampedBuilderWithCachingAndLoggingDisabled() {
+    public void shouldCreateHeadersBuilderWithCachingAndLoggingDisabled() {
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
             Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider, 
STORE_PREFIX
         );
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final StateStore wrapped = ((WrappedStateStore) store).wrapped();
         assertFalse(wrapped instanceof CachingWindowStore);
-        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStore);
+        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
     }
 
     @Test
-    public void 
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
 {
+    public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
 {
         mockWindowStoreSupplier();
 
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.as(windowStoreSupplier), 
nameProvider, STORE_PREFIX);
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         final StateStore logging = caching.wrapped();
         assertEquals(innerWindowStore.name(), store.name());
-        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
+        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
         assertInstanceOf(CachingWindowStore.class, caching);
-        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
+        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
     @Test
-    public void 
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingDisabled() {
+    public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingDisabled() {
         mockWindowStoreSupplier();
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider, 
STORE_PREFIX);
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         assertEquals(innerWindowStore.name(), store.name());
-        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
+        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
     @Test
-    public void 
shouldCreateTimestampedStoreWithProvidedSupplierAndLoggingDisabled() {
+    public void 
shouldCreateHeadersStoreWithProvidedSupplierAndLoggingDisabled() {
         mockWindowStoreSupplier();
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withLoggingDisabled(), nameProvider, 
STORE_PREFIX);
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore caching = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         assertEquals(innerWindowStore.name(), store.name());
         assertInstanceOf(CachingWindowStore.class, caching);
-        assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStore);
+        assertFalse(caching.wrapped() instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
     }
 
     @Test
-    public void 
shouldCreateTimestampedStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
+    public void 
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
         mockWindowStoreSupplier();
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withCachingDisabled().withLoggingDisabled(), 
nameProvider, STORE_PREFIX);
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final StateStore wrapped = ((WrappedStateStore) store).wrapped();
         assertEquals(innerWindowStore.name(), store.name());
         assertFalse(wrapped instanceof CachingWindowStore);
-        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStore);
-    }
-
-    @Test
-    public void shouldCreateHeadersAwareStoreWithLoggingEnabledByDefault() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.<String, String, 
WindowStore<Bytes, byte[]>>as("store")
-                .withCachingDisabled(), nameProvider, STORE_PREFIX);
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
-
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
-        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
-        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
-    }
-
-    @Test
-    public void shouldCreateHeadersAwareStoreWithLoggingDisabled() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
-            Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
-        );
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
-
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStore);
-    }
-
-    @Test
-    public void shouldBuildHeadersAwareStoreWithCachingEnabledByDefault() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
-
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertInstanceOf(CachingWindowStore.class, wrapped);
-    }
-
-    @Test
-    public void 
shouldCreateHeadersAwareStoreWithProvidedSupplierAndLoggingEnabled() {
-        mockWindowStoreSupplier();
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.<String, 
String>as(windowStoreSupplier).withCachingDisabled(), nameProvider, 
STORE_PREFIX);
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
-
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
-        assertEquals(innerWindowStore.name(), store.name());
-        assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
-        
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
-    }
-
-    @Test
-    public void shouldCreateHeadersAwareStoreWithCachingAndLoggingDisabled() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
-            Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider, 
STORE_PREFIX
-        );
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
-
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertFalse(wrapped instanceof CachingWindowStore);
         assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
     }
 
     @Test
-    public void shouldCreateTimestampedStoreWithOnWindowClose() {
+    public void shouldCreateHeadersStoreWithOnWindowClose() {
         emitStrategy = EmitStrategy.onWindowClose();
 
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.<String, String, 
WindowStore<Bytes, byte[]>>as("store")
                 .withCachingDisabled(), nameProvider, STORE_PREFIX);
 
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
-
-        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
-        assertInstanceOf(MeteredTimestampedWindowStore.class, store);
-        assertInstanceOf(ChangeLoggingTimestampedWindowBytesStore.class, 
logging);
-    }
-
-    @Test
-    public void 
shouldCreateTimestampedStoreWithOnWindowCloseAndCachingEnabled() {
-        emitStrategy = EmitStrategy.onWindowClose();
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
-
-        final TimestampedWindowStore<String, String> store = 
getTimestampedStore(materialized);
-
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);
-    }
-
-    @Test
-    public void shouldCreateHeadersAwareStoreWithOnWindowClose() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-        emitStrategy = EmitStrategy.onWindowClose();
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
-            new MaterializedInternal<>(Materialized.<String, String, 
WindowStore<Bytes, byte[]>>as("store")
-                .withCachingDisabled(), nameProvider, STORE_PREFIX);
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
         assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class, 
store);
         
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class, 
logging);
     }
 
     @Test
-    public void 
shouldCreateHeadersAwareStoreWithOnWindowCloseAndLoggingDisabled() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
-        emitStrategy = EmitStrategy.onWindowClose();
-
-        final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized = new MaterializedInternal<>(
-            Materialized.<String, String, WindowStore<Bytes, 
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
-        );
-
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
-
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertFalse(wrapped instanceof 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders);
-    }
-
-    @Test
-    public void 
shouldCreateHeadersAwareStoreWithOnWindowCloseAndCachingEnabled() {
-        doReturn("headers")
-                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+    public void shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled() {
         emitStrategy = EmitStrategy.onWindowClose();
 
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =
             new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
 
-        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
+        final TimestampedWindowStoreWithHeaders<String, String> store = 
getHeadersStore(materialized);
 
         final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertInstanceOf(TimeOrderedCachingWindowStore.class, wrapped);

Review Comment:
   Seems this is still an open question? Let's me sure we don't drop it... \cc 
@frankvicky @bbejeck 
   
   Sound related to the fix that Frank is working on? So I assume we can merge 
this PR (including this change), and this test starts failing with Frank's fix 
and we need to update the test on the fix-PR?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java:
##########
@@ -192,9 +192,11 @@ private ValueAndTimestamp<VOut> transformValue(final K 
key, final ValueAndTimest
                 new RecordHeaders()
             ));
 
-            final ValueAndTimestamp<VOut> result = ValueAndTimestamp.make(
-                valueTransformer.transform(key, 
getValueOrNull(valueAndTimestamp)),
-                valueAndTimestamp == null ? UNKNOWN : 
valueAndTimestamp.timestamp());
+            final ValueTimestampHeaders<VOut> result = 
ValueTimestampHeaders.make(
+                valueTransformer.transform(key, 
getValueOrNull(valueTimestampHeaders)),
+                valueTimestampHeaders == null ? UNKNOWN : 
valueTimestampHeaders.timestamp(),
+                valueTimestampHeaders == null ? null : 
valueTimestampHeaders.headers()

Review Comment:
   Sound good -- wondering if we should use `context.headers()` instead of 
`null` if `valueTimestampHeaders == null` ? Similar question as for 
`KTable#mapValues`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to