mjsax commented on code in PR #21455:
URL: https://github.com/apache/kafka/pull/21455#discussion_r2830571693


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Builder for {@link TimestampedKeyValueStoreWithHeaders} instances.
+ *
+ * This is analogous to {@link TimestampedKeyValueStoreBuilder}, but uses
+ * {@link ValueTimestampHeaders} as the value wrapper and wires up the
+ * header-aware store stack (change-logging, caching, metering).
+ */
+public class TimestampedKeyValueStoreBuilderWithHeaders<K, V>
+    extends AbstractStoreBuilder<K, ValueTimestampHeaders<V>, 
TimestampedKeyValueStoreWithHeaders<K, V>> {
+
+    private final KeyValueBytesStoreSupplier storeSupplier;
+
+    public TimestampedKeyValueStoreBuilderWithHeaders(final 
KeyValueBytesStoreSupplier storeSupplier,
+                                                      final Serde<K> keySerde,
+                                                      final Serde<V> 
valueSerde,
+                                                      final Time time) {
+        super(
+            storeSupplier.name(),
+            keySerde,
+            valueSerde == null ? null : new 
ValueTimestampHeadersSerde<>(valueSerde),
+            time
+        );
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's 
metricsScope can't be null");
+        this.storeSupplier = storeSupplier;
+    }
+
+    @Override
+    public TimestampedKeyValueStoreWithHeaders<K, V> build() {
+        KeyValueStore<Bytes, byte[]> store = storeSupplier.get();
+
+        if (!(store instanceof HeadersBytesStore)) {
+            if (store.persistent()) {
+                store = new TimestampedToHeadersStoreAdapter(store);
+            } else {
+                store = new 
InMemoryTimestampedKeyValueStoreWithHeadersMarker(store);
+            }
+        }
+
+        return new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            maybeWrapCaching(maybeWrapLogging(store)),
+            storeSupplier.metricsScope(),
+            time,
+            keySerde,
+            valueSerde
+        );
+    }
+
+    private KeyValueStore<Bytes, byte[]> maybeWrapCaching(final 
KeyValueStore<Bytes, byte[]> inner) {
+        if (!enableCaching) {
+            return inner;
+        }
+        return new CachingKeyValueStore(inner, true);
+    }
+
+    private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final 
KeyValueStore<Bytes, byte[]> inner) {
+        if (!enableLogging) {
+            return inner;
+        }
+        return new 
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders(inner);
+    }
+
+    private static final class 
InMemoryTimestampedKeyValueStoreWithHeadersMarker
+        extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, Bytes, byte[]>
+        implements KeyValueStore<Bytes, byte[]>, HeadersBytesStore {
+
+        private InMemoryTimestampedKeyValueStoreWithHeadersMarker(final 
KeyValueStore<Bytes, byte[]> wrapped) {
+            super(wrapped);
+            if (wrapped.persistent()) {
+                throw new IllegalArgumentException("Provided store must not be 
a persistent store, but it is.");
+            }
+        }
+
+        @Override
+        public void put(final Bytes key,
+                        final byte[] value) {
+            wrapped().put(key, value);
+        }
+
+        @Override
+        public byte[] putIfAbsent(final Bytes key,
+                                  final byte[] value) {
+            return wrapped().putIfAbsent(key, value);
+        }
+
+        @Override
+        public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+            wrapped().putAll(entries);
+        }
+
+        @Override
+        public byte[] delete(final Bytes key) {
+            return wrapped().delete(key);
+        }
+
+        @Override
+        public byte[] get(final Bytes key) {
+            return wrapped().get(key);
+        }
+
+        @Override
+        public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                     final Bytes to) {
+            throw new UnsupportedOperationException("Range queries are not 
supported by in-memory timestamped key-value stores with headers");

Review Comment:
   Why do we need to throw? Not sure why we would not support this for 
in-memory stores? Same question below.
   
   Is this added because of IQ? For this case, it would not be correct to throw 
here. While `range()` an other are also used by IQv1, these methods are exposes 
inside `Processors` and thus are regular API (like put/get) user code might use.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java:
##########
@@ -24,11 +24,14 @@ public class RocksDBKeyValueBytesStoreSupplier implements 
KeyValueBytesStoreSupp
 
     private final String name;
     private final boolean returnTimestampedStore;
+    private final boolean returnHeadersStore;
 
     public RocksDBKeyValueBytesStoreSupplier(final String name,
-                                             final boolean 
returnTimestampedStore) {
+                                             final boolean 
returnTimestampedStore,
+                                             final boolean returnHeadersStore) 
{
         this.name = name;
         this.returnTimestampedStore = returnTimestampedStore;
+        this.returnHeadersStore = returnHeadersStore;

Review Comment:
   Should we add a check for invalid combination of `returnTimestampedStore` 
and `returnHeadersStore`?
   
   We only support 3 out of 4 combinations, right?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Builder for {@link TimestampedKeyValueStoreWithHeaders} instances.
+ *
+ * This is analogous to {@link TimestampedKeyValueStoreBuilder}, but uses
+ * {@link ValueTimestampHeaders} as the value wrapper and wires up the
+ * header-aware store stack (change-logging, caching, metering).
+ */
+public class TimestampedKeyValueStoreBuilderWithHeaders<K, V>
+    extends AbstractStoreBuilder<K, ValueTimestampHeaders<V>, 
TimestampedKeyValueStoreWithHeaders<K, V>> {
+
+    private final KeyValueBytesStoreSupplier storeSupplier;
+
+    public TimestampedKeyValueStoreBuilderWithHeaders(final 
KeyValueBytesStoreSupplier storeSupplier,
+                                                      final Serde<K> keySerde,
+                                                      final Serde<V> 
valueSerde,
+                                                      final Time time) {
+        super(
+            storeSupplier.name(),
+            keySerde,
+            valueSerde == null ? null : new 
ValueTimestampHeadersSerde<>(valueSerde),
+            time
+        );
+        Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
+        Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's 
metricsScope can't be null");
+        this.storeSupplier = storeSupplier;
+    }
+
+    @Override
+    public TimestampedKeyValueStoreWithHeaders<K, V> build() {
+        KeyValueStore<Bytes, byte[]> store = storeSupplier.get();
+
+        if (!(store instanceof HeadersBytesStore)) {
+            if (store.persistent()) {
+                store = new TimestampedToHeadersStoreAdapter(store);
+            } else {
+                store = new 
InMemoryTimestampedKeyValueStoreWithHeadersMarker(store);
+            }
+        }
+
+        return new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            maybeWrapCaching(maybeWrapLogging(store)),
+            storeSupplier.metricsScope(),
+            time,
+            keySerde,
+            valueSerde
+        );
+    }
+
+    private KeyValueStore<Bytes, byte[]> maybeWrapCaching(final 
KeyValueStore<Bytes, byte[]> inner) {
+        if (!enableCaching) {
+            return inner;
+        }
+        return new CachingKeyValueStore(inner, true);

Review Comment:
   I was just wondering about `true` parameter -- seems to be related to IQv2. 
Do we need to do something about this (I believe yes from a quick look into the 
code) -- can we write a proper test, that IQv2 is disabled, and does not crash 
anything if one tries to use it, and provides a good error message?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.TimestampedKeyQuery;
+import org.apache.kafka.streams.query.TimestampedRangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link TimestampedKeyValueStoreWithHeaders} and
+ * {@link TimestampedKeyValueStore}.
+ * <p>
+ * If a user provides a supplier for {@code TimestampedKeyValueStore} (without 
headers) via
+ * {@link Materialized#as(KeyValueBytesStoreSupplier)} when building
+ * a {@code TimestampedKeyValueStoreWithHeaders}, this adapter is used to 
translate between
+ * the timestamped {@code byte[]} format and the timestamped-with-headers 
{@code byte[]} format.
+ *
+ * @see TimestampedToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class TimestampedToHeadersStoreAdapter implements KeyValueStore<Bytes, 
byte[]> {
+    final KeyValueStore<Bytes, byte[]> store;
+
+    TimestampedToHeadersStoreAdapter(final KeyValueStore<Bytes, byte[]> store) 
{
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        this.store = store;
+    }
+
+    /**
+     * Extract raw timestamped value (timestamp + value) from serialized 
ValueTimestampHeaders.
+     * This strips the headers portion but keeps timestamp and value intact.
+     *
+     * Format conversion:
+     * Input:  [headersSize(varint)][headers][timestamp(8)][value]
+     * Output: [timestamp(8)][value]
+     */
+    static byte[] rawTimestampedValue(final byte[] rawValueTimestampHeaders) {
+        if (rawValueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        // Skip headers, keep timestamp + value
+        buffer.position(buffer.position() + headersSize);
+
+        final byte[] result = new byte[buffer.remaining()];
+        buffer.get(result);
+        return result;
+    }
+
+    @Override
+    public void put(final Bytes key,
+                    final byte[] valueWithTimestampAndHeaders) {
+        store.put(key, rawTimestampedValue(valueWithTimestampAndHeaders));
+    }
+
+    @Override
+    public byte[] putIfAbsent(final Bytes key,
+                              final byte[] valueWithTimestampAndHeaders) {
+        return convertToHeaderFormat(store.putIfAbsent(
+            key,
+            rawTimestampedValue(valueWithTimestampAndHeaders)));
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        for (final KeyValue<Bytes, byte[]> entry : entries) {
+            final byte[] valueWithTimestampAndHeaders = entry.value;
+            store.put(entry.key, 
rawTimestampedValue(valueWithTimestampAndHeaders));
+        }
+    }
+
+    @Override
+    public byte[] delete(final Bytes key) {
+        return convertToHeaderFormat(store.delete(key));
+    }
+
+    @Override
+    public String name() {
+        return store.name();
+    }
+
+    @Override
+    public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+        store.init(stateStoreContext, root);
+    }
+
+    @Override
+    public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+        store.commit(changelogOffsets);
+    }
+
+    @Override
+    public void close() {
+        store.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return store.isOpen();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config) {
+
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        QueryResult<R> result = store.query(query, positionBound, config);
+
+        // this adapter always needs to return a 
`value-with-timestamp-and-headers` result to hold up its contract
+        // thus, we need to add the empty headers wrapper even for timestamped 
queries
+        if (result.isSuccess()) {
+            if (query instanceof KeyQuery || query instanceof 
TimestampedKeyQuery) {
+                final byte[] timestampedValue = (byte[]) result.getResult();
+                final byte[] valueWithHeaders = 
convertToHeaderFormat(timestampedValue);
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(result, 
valueWithHeaders);
+            } else if (query instanceof RangeQuery || query instanceof 
TimestampedRangeQuery) {
+                final TimestampedToHeadersRocksIteratorAdapter 
wrappedRocksDBRangeIterator =
+                    new 
TimestampedToHeadersRocksIteratorAdapter((RocksDbIterator) result.getResult());
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(result, 
wrappedRocksDBRangeIterator);
+            } else {
+                throw new IllegalArgumentException("Unsupported query type: " 
+ query.getClass());
+            }
+        }
+
+        if (config.isCollectExecutionInfo()) {
+            final long end = System.nanoTime();
+            result.addExecutionInfo(
+                "Handled in " + getClass() + " in " + (end - start) + "ns"
+            );
+        }
+        return result;
+    }
+
+    @Override
+    public Position getPosition() {
+        return store.getPosition();
+    }
+
+    @Override
+    public byte[] get(final Bytes key) {
+        return convertToHeaderFormat(store.get(key));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                 final Bytes to) {
+        return new TimestampedToHeadersIteratorAdapter<>(store.range(from, 
to));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
+                                                        final Bytes to) {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.reverseRange(from, to));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> all() {
+        return new TimestampedToHeadersIteratorAdapter<>(store.all());
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseAll() {
+        return new TimestampedToHeadersIteratorAdapter<>(store.reverseAll());
+    }
+
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix,
+                                                                               
     final PS prefixKeySerializer) {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.prefixScan(prefix, 
prefixKeySerializer));
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return store.approximateNumEntries();
+    }
+
+    /**
+     * Generic iterator adapter for range/all operations
+     */
+    private static class TimestampedToHeadersIteratorAdapter<K> implements 
KeyValueIterator<K, byte[]> {
+        private final KeyValueIterator<K, byte[]> innerIterator;
+
+        public TimestampedToHeadersIteratorAdapter(final KeyValueIterator<K, 
byte[]> innerIterator) {
+            this.innerIterator = innerIterator;
+        }
+
+        @Override
+        public void close() {
+            innerIterator.close();
+        }
+
+        @Override
+        public K peekNextKey() {
+            return innerIterator.peekNextKey();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return innerIterator.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, byte[]> next() {
+            final KeyValue<K, byte[]> timestampedKeyValue = 
innerIterator.next();
+            return KeyValue.pair(timestampedKeyValue.key, 
convertToHeaderFormat(timestampedKeyValue.value));

Review Comment:
   The code of `TimestampedToHeadersRocksIteratorAdapter` below, has `null` 
check -- do we need to add a `null` check here, too? Or do we not need it?
   
   (I understand that existing `KeyValueToTimestampedKeyValueIteratorAdapter` 
also does not have this `null` check, but wondering if it should have?)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java:
##########
@@ -38,9 +41,13 @@ public String name() {
 
     @Override
     public KeyValueStore<Bytes, byte[]> get() {
-        return returnTimestampedStore ?
-            new RocksDBTimestampedStore(name, metricsScope()) :
-            new RocksDBStore(name, metricsScope());
+        if (returnHeadersStore) {

Review Comment:
   ```suggestion
           if (returnTimestampedStore && returnHeadersStore) {
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java:
##########
@@ -38,9 +41,13 @@ public String name() {
 
     @Override
     public KeyValueStore<Bytes, byte[]> get() {
-        return returnTimestampedStore ?
-            new RocksDBTimestampedStore(name, metricsScope()) :
-            new RocksDBStore(name, metricsScope());
+        if (returnHeadersStore) {

Review Comment:
   Maybe somewhat redundant, but this is how the flags are used... seems 
cleaner?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.TimestampedKeyQuery;
+import org.apache.kafka.streams.query.TimestampedRangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link TimestampedKeyValueStoreWithHeaders} and
+ * {@link TimestampedKeyValueStore}.
+ * <p>
+ * If a user provides a supplier for {@code TimestampedKeyValueStore} (without 
headers) via
+ * {@link Materialized#as(KeyValueBytesStoreSupplier)} when building
+ * a {@code TimestampedKeyValueStoreWithHeaders}, this adapter is used to 
translate between
+ * the timestamped {@code byte[]} format and the timestamped-with-headers 
{@code byte[]} format.
+ *
+ * @see TimestampedToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class TimestampedToHeadersStoreAdapter implements KeyValueStore<Bytes, 
byte[]> {
+    final KeyValueStore<Bytes, byte[]> store;
+
+    TimestampedToHeadersStoreAdapter(final KeyValueStore<Bytes, byte[]> store) 
{
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        this.store = store;
+    }
+
+    /**
+     * Extract raw timestamped value (timestamp + value) from serialized 
ValueTimestampHeaders.
+     * This strips the headers portion but keeps timestamp and value intact.
+     *
+     * Format conversion:
+     * Input:  [headersSize(varint)][headers][timestamp(8)][value]
+     * Output: [timestamp(8)][value]
+     */
+    static byte[] rawTimestampedValue(final byte[] rawValueTimestampHeaders) {
+        if (rawValueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        // Skip headers, keep timestamp + value
+        buffer.position(buffer.position() + headersSize);
+
+        final byte[] result = new byte[buffer.remaining()];
+        buffer.get(result);
+        return result;
+    }
+
+    @Override
+    public void put(final Bytes key,
+                    final byte[] valueWithTimestampAndHeaders) {
+        store.put(key, rawTimestampedValue(valueWithTimestampAndHeaders));
+    }
+
+    @Override
+    public byte[] putIfAbsent(final Bytes key,
+                              final byte[] valueWithTimestampAndHeaders) {
+        return convertToHeaderFormat(store.putIfAbsent(
+            key,
+            rawTimestampedValue(valueWithTimestampAndHeaders)));
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        for (final KeyValue<Bytes, byte[]> entry : entries) {
+            final byte[] valueWithTimestampAndHeaders = entry.value;
+            store.put(entry.key, 
rawTimestampedValue(valueWithTimestampAndHeaders));
+        }
+    }
+
+    @Override
+    public byte[] delete(final Bytes key) {
+        return convertToHeaderFormat(store.delete(key));
+    }
+
+    @Override
+    public String name() {
+        return store.name();
+    }
+
+    @Override
+    public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+        store.init(stateStoreContext, root);
+    }
+
+    @Override
+    public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+        store.commit(changelogOffsets);
+    }
+
+    @Override
+    public void close() {
+        store.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return store.isOpen();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(

Review Comment:
   Does it make sense to add this now? If we don't want to support IQv2 just 
yet, should be just throw an exception for now, and add this later?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.TimestampedKeyQuery;
+import org.apache.kafka.streams.query.TimestampedRangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link TimestampedKeyValueStoreWithHeaders} and
+ * {@link TimestampedKeyValueStore}.
+ * <p>
+ * If a user provides a supplier for {@code TimestampedKeyValueStore} (without 
headers) via
+ * {@link Materialized#as(KeyValueBytesStoreSupplier)} when building
+ * a {@code TimestampedKeyValueStoreWithHeaders}, this adapter is used to 
translate between
+ * the timestamped {@code byte[]} format and the timestamped-with-headers 
{@code byte[]} format.
+ *
+ * @see TimestampedToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class TimestampedToHeadersStoreAdapter implements KeyValueStore<Bytes, 
byte[]> {
+    final KeyValueStore<Bytes, byte[]> store;
+
+    TimestampedToHeadersStoreAdapter(final KeyValueStore<Bytes, byte[]> store) 
{
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        this.store = store;
+    }
+
+    /**
+     * Extract raw timestamped value (timestamp + value) from serialized 
ValueTimestampHeaders.
+     * This strips the headers portion but keeps timestamp and value intact.
+     *
+     * Format conversion:
+     * Input:  [headersSize(varint)][headers][timestamp(8)][value]
+     * Output: [timestamp(8)][value]
+     */
+    static byte[] rawTimestampedValue(final byte[] rawValueTimestampHeaders) {
+        if (rawValueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        // Skip headers, keep timestamp + value
+        buffer.position(buffer.position() + headersSize);
+
+        final byte[] result = new byte[buffer.remaining()];
+        buffer.get(result);
+        return result;
+    }
+
+    @Override
+    public void put(final Bytes key,
+                    final byte[] valueWithTimestampAndHeaders) {
+        store.put(key, rawTimestampedValue(valueWithTimestampAndHeaders));
+    }
+
+    @Override
+    public byte[] putIfAbsent(final Bytes key,
+                              final byte[] valueWithTimestampAndHeaders) {
+        return convertToHeaderFormat(store.putIfAbsent(
+            key,
+            rawTimestampedValue(valueWithTimestampAndHeaders)));
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        for (final KeyValue<Bytes, byte[]> entry : entries) {
+            final byte[] valueWithTimestampAndHeaders = entry.value;
+            store.put(entry.key, 
rawTimestampedValue(valueWithTimestampAndHeaders));
+        }
+    }
+
+    @Override
+    public byte[] delete(final Bytes key) {
+        return convertToHeaderFormat(store.delete(key));
+    }
+
+    @Override
+    public String name() {
+        return store.name();
+    }
+
+    @Override
+    public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+        store.init(stateStoreContext, root);
+    }
+
+    @Override
+    public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+        store.commit(changelogOffsets);
+    }
+
+    @Override
+    public void close() {
+        store.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return store.isOpen();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config) {
+
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        QueryResult<R> result = store.query(query, positionBound, config);
+
+        // this adapter always needs to return a 
`value-with-timestamp-and-headers` result to hold up its contract
+        // thus, we need to add the empty headers wrapper even for timestamped 
queries
+        if (result.isSuccess()) {
+            if (query instanceof KeyQuery || query instanceof 
TimestampedKeyQuery) {
+                final byte[] timestampedValue = (byte[]) result.getResult();
+                final byte[] valueWithHeaders = 
convertToHeaderFormat(timestampedValue);
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(result, 
valueWithHeaders);
+            } else if (query instanceof RangeQuery || query instanceof 
TimestampedRangeQuery) {
+                final TimestampedToHeadersRocksIteratorAdapter 
wrappedRocksDBRangeIterator =
+                    new 
TimestampedToHeadersRocksIteratorAdapter((RocksDbIterator) result.getResult());
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(result, 
wrappedRocksDBRangeIterator);
+            } else {
+                throw new IllegalArgumentException("Unsupported query type: " 
+ query.getClass());
+            }
+        }
+
+        if (config.isCollectExecutionInfo()) {
+            final long end = System.nanoTime();
+            result.addExecutionInfo(
+                "Handled in " + getClass() + " in " + (end - start) + "ns"
+            );
+        }
+        return result;
+    }
+
+    @Override
+    public Position getPosition() {
+        return store.getPosition();
+    }
+
+    @Override
+    public byte[] get(final Bytes key) {
+        return convertToHeaderFormat(store.get(key));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                 final Bytes to) {
+        return new TimestampedToHeadersIteratorAdapter<>(store.range(from, 
to));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
+                                                        final Bytes to) {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.reverseRange(from, to));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> all() {
+        return new TimestampedToHeadersIteratorAdapter<>(store.all());
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseAll() {
+        return new TimestampedToHeadersIteratorAdapter<>(store.reverseAll());
+    }
+
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix,
+                                                                               
     final PS prefixKeySerializer) {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.prefixScan(prefix, 
prefixKeySerializer));
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return store.approximateNumEntries();
+    }
+
+    /**
+     * Generic iterator adapter for range/all operations
+     */
+    private static class TimestampedToHeadersIteratorAdapter<K> implements 
KeyValueIterator<K, byte[]> {
+        private final KeyValueIterator<K, byte[]> innerIterator;
+
+        public TimestampedToHeadersIteratorAdapter(final KeyValueIterator<K, 
byte[]> innerIterator) {
+            this.innerIterator = innerIterator;
+        }
+
+        @Override
+        public void close() {
+            innerIterator.close();
+        }
+
+        @Override
+        public K peekNextKey() {
+            return innerIterator.peekNextKey();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return innerIterator.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, byte[]> next() {
+            final KeyValue<K, byte[]> timestampedKeyValue = 
innerIterator.next();
+            return KeyValue.pair(timestampedKeyValue.key, 
convertToHeaderFormat(timestampedKeyValue.value));
+        }
+    }
+
+    /**
+     * RocksDB-specific iterator adapter for query operations
+     */
+    private static class TimestampedToHeadersRocksIteratorAdapter implements 
ManagedKeyValueIterator<Bytes, byte[]> {
+
+        private final RocksDbIterator rocksDbIterator;
+
+        public TimestampedToHeadersRocksIteratorAdapter(final RocksDbIterator 
rocksDbIterator) {

Review Comment:
   This naming is much better than the one from 
`KeyValueToTimestampedKeyValueByteStoreAdapter.KeyValueToTimestampedKeyValueAdapterIterator`
 -- can we update the exiting name over there to align naming schema (best in a 
follow up PR)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.TimestampedKeyQuery;
+import org.apache.kafka.streams.query.TimestampedRangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link TimestampedKeyValueStoreWithHeaders} and
+ * {@link TimestampedKeyValueStore}.
+ * <p>
+ * If a user provides a supplier for {@code TimestampedKeyValueStore} (without 
headers) via
+ * {@link Materialized#as(KeyValueBytesStoreSupplier)} when building
+ * a {@code TimestampedKeyValueStoreWithHeaders}, this adapter is used to 
translate between
+ * the timestamped {@code byte[]} format and the timestamped-with-headers 
{@code byte[]} format.
+ *
+ * @see TimestampedToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class TimestampedToHeadersStoreAdapter implements KeyValueStore<Bytes, 
byte[]> {
+    final KeyValueStore<Bytes, byte[]> store;
+
+    TimestampedToHeadersStoreAdapter(final KeyValueStore<Bytes, byte[]> store) 
{
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        this.store = store;
+    }
+
+    /**
+     * Extract raw timestamped value (timestamp + value) from serialized 
ValueTimestampHeaders.
+     * This strips the headers portion but keeps timestamp and value intact.
+     *
+     * Format conversion:
+     * Input:  [headersSize(varint)][headers][timestamp(8)][value]
+     * Output: [timestamp(8)][value]
+     */
+    static byte[] rawTimestampedValue(final byte[] rawValueTimestampHeaders) {
+        if (rawValueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        // Skip headers, keep timestamp + value
+        buffer.position(buffer.position() + headersSize);
+
+        final byte[] result = new byte[buffer.remaining()];
+        buffer.get(result);
+        return result;
+    }
+
+    @Override
+    public void put(final Bytes key,
+                    final byte[] valueWithTimestampAndHeaders) {
+        store.put(key, rawTimestampedValue(valueWithTimestampAndHeaders));
+    }
+
+    @Override
+    public byte[] putIfAbsent(final Bytes key,
+                              final byte[] valueWithTimestampAndHeaders) {
+        return convertToHeaderFormat(store.putIfAbsent(
+            key,
+            rawTimestampedValue(valueWithTimestampAndHeaders)));
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        for (final KeyValue<Bytes, byte[]> entry : entries) {
+            final byte[] valueWithTimestampAndHeaders = entry.value;
+            store.put(entry.key, 
rawTimestampedValue(valueWithTimestampAndHeaders));
+        }
+    }
+
+    @Override
+    public byte[] delete(final Bytes key) {
+        return convertToHeaderFormat(store.delete(key));
+    }
+
+    @Override
+    public String name() {
+        return store.name();
+    }
+
+    @Override
+    public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+        store.init(stateStoreContext, root);
+    }
+
+    @Override
+    public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+        store.commit(changelogOffsets);
+    }
+
+    @Override
+    public void close() {
+        store.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return store.isOpen();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(
+        final Query<R> query,
+        final PositionBound positionBound,
+        final QueryConfig config) {
+
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        QueryResult<R> result = store.query(query, positionBound, config);
+
+        // this adapter always needs to return a 
`value-with-timestamp-and-headers` result to hold up its contract
+        // thus, we need to add the empty headers wrapper even for timestamped 
queries
+        if (result.isSuccess()) {
+            if (query instanceof KeyQuery || query instanceof 
TimestampedKeyQuery) {
+                final byte[] timestampedValue = (byte[]) result.getResult();
+                final byte[] valueWithHeaders = 
convertToHeaderFormat(timestampedValue);
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(result, 
valueWithHeaders);
+            } else if (query instanceof RangeQuery || query instanceof 
TimestampedRangeQuery) {
+                final TimestampedToHeadersRocksIteratorAdapter 
wrappedRocksDBRangeIterator =
+                    new 
TimestampedToHeadersRocksIteratorAdapter((RocksDbIterator) result.getResult());
+                result = (QueryResult<R>) 
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(result, 
wrappedRocksDBRangeIterator);
+            } else {
+                throw new IllegalArgumentException("Unsupported query type: " 
+ query.getClass());
+            }
+        }
+
+        if (config.isCollectExecutionInfo()) {
+            final long end = System.nanoTime();
+            result.addExecutionInfo(
+                "Handled in " + getClass() + " in " + (end - start) + "ns"
+            );
+        }
+        return result;
+    }
+
+    @Override
+    public Position getPosition() {
+        return store.getPosition();
+    }
+
+    @Override
+    public byte[] get(final Bytes key) {
+        return convertToHeaderFormat(store.get(key));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                 final Bytes to) {
+        return new TimestampedToHeadersIteratorAdapter<>(store.range(from, 
to));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
+                                                        final Bytes to) {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.reverseRange(from, to));
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> all() {
+        return new TimestampedToHeadersIteratorAdapter<>(store.all());
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseAll() {
+        return new TimestampedToHeadersIteratorAdapter<>(store.reverseAll());
+    }
+
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix,
+                                                                               
     final PS prefixKeySerializer) {
+        return new 
TimestampedToHeadersIteratorAdapter<>(store.prefixScan(prefix, 
prefixKeySerializer));
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        return store.approximateNumEntries();
+    }
+
+    /**
+     * Generic iterator adapter for range/all operations
+     */
+    private static class TimestampedToHeadersIteratorAdapter<K> implements 
KeyValueIterator<K, byte[]> {

Review Comment:
   Why are we nesting this as private classes? For the ts/kv-adapter we have it 
as top level classes, and re-use them for other store types, too. Can't we 
re-use them for other store types for the header case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to