mjsax commented on code in PR #21680:
URL: https://github.com/apache/kafka/pull/21680#discussion_r2902669429


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersIteratorAdapter.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and
+ * {@link org.apache.kafka.streams.state.SessionStore}.
+ * <p>
+ * When iterating over session entries from a store that stores values with 
headers,
+ * this adapter strips the headers prefix so the caller receives raw 
aggregation bytes
+ * without headers.
+ *
+ * @see SessionToHeadersStoreAdapter
+ */
+class SessionToHeadersIteratorAdapter implements 
KeyValueIterator<Windowed<Bytes>, byte[]> {
+    private final KeyValueIterator<Windowed<Bytes>, byte[]> innerIterator;
+
+    SessionToHeadersIteratorAdapter(final KeyValueIterator<Windowed<Bytes>, 
byte[]> innerIterator) {
+        this.innerIterator = innerIterator;
+    }
+
+    @Override
+    public void close() {
+        innerIterator.close();
+    }
+
+    @Override
+    public Windowed<Bytes> peekNextKey() {
+        return innerIterator.peekNextKey();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return innerIterator.hasNext();
+    }
+
+    @Override
+    public KeyValue<Windowed<Bytes>, byte[]> next() {
+        final KeyValue<Windowed<Bytes>, byte[]> keyValue = 
innerIterator.next();

Review Comment:
   Missing `null` check for `keyValue`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionStore;
+
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and
+ * {@link org.apache.kafka.streams.state.SessionStore}.
+ * <p>
+ * If a user provides a supplier for {@code SessionStore} (without headers) via
+ * {@link org.apache.kafka.streams.kstream.Materialized} when building
+ * a {@code SessionStoreWithHeaders}, this adapter is used to translate between
+ * the raw aggregation {@code byte[]} format and the aggregation-with-headers 
{@code byte[]} format.
+ * <p>
+ * On writes (put), empty headers are prepended to the raw aggregation value 
before
+ * delegating to the inner store.
+ * On reads (get, fetch, findSessions), the headers prefix is stripped from 
the stored value
+ * so the caller receives raw aggregation bytes without headers.
+ *
+ * @see SessionToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class SessionToHeadersStoreAdapter implements SessionStore<Bytes, 
byte[]> {
+    final SessionStore<Bytes, byte[]> store;
+
+    SessionToHeadersStoreAdapter(final SessionStore<Bytes, byte[]> store) {
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        this.store = store;
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
key,
+                                                                  final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.findSessions(key, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionEndTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.findSessions(earliestSessionEndTime, latestSessionEndTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFindSessions(final Bytes key,
+                                                                          
final long earliestSessionEndTime,
+                                                                          
final long latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.backwardFindSessions(key, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
keyFrom,
+                                                                  final Bytes 
keyTo,
+                                                                  final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.findSessions(keyFrom, keyTo, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFindSessions(final Bytes keyFrom,
+                                                                          
final Bytes keyTo,
+                                                                          
final long earliestSessionEndTime,
+                                                                          
final long latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public byte[] fetchSession(final Bytes key,
+                               final long sessionStartTime,
+                               final long sessionEndTime) {
+        return 
AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key, 
sessionStartTime, sessionEndTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
+        return new SessionToHeadersIteratorAdapter(store.fetch(key));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes 
key) {
+        return new SessionToHeadersIteratorAdapter(store.backwardFetch(key));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
keyFrom, final Bytes keyTo) {
+        return new SessionToHeadersIteratorAdapter(store.fetch(keyFrom, 
keyTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes 
keyFrom, final Bytes keyTo) {
+        return new 
SessionToHeadersIteratorAdapter(store.backwardFetch(keyFrom, keyTo));
+    }
+
+    @Override
+    public void remove(final Windowed<Bytes> sessionKey) {
+        store.remove(sessionKey);
+    }
+
+    @Override
+    public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
+        store.put(sessionKey, convertToHeaderFormat(aggregate));

Review Comment:
   I think we need to do the opposite here, and convert header to plain 
aggregations



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionStore;
+
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and
+ * {@link org.apache.kafka.streams.state.SessionStore}.
+ * <p>
+ * If a user provides a supplier for {@code SessionStore} (without headers) via
+ * {@link org.apache.kafka.streams.kstream.Materialized} when building
+ * a {@code SessionStoreWithHeaders}, this adapter is used to translate between
+ * the raw aggregation {@code byte[]} format and the aggregation-with-headers 
{@code byte[]} format.
+ * <p>
+ * On writes (put), empty headers are prepended to the raw aggregation value 
before
+ * delegating to the inner store.
+ * On reads (get, fetch, findSessions), the headers prefix is stripped from 
the stored value
+ * so the caller receives raw aggregation bytes without headers.
+ *
+ * @see SessionToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class SessionToHeadersStoreAdapter implements SessionStore<Bytes, 
byte[]> {
+    final SessionStore<Bytes, byte[]> store;
+
+    SessionToHeadersStoreAdapter(final SessionStore<Bytes, byte[]> store) {
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        this.store = store;
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
key,
+                                                                  final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.findSessions(key, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionEndTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.findSessions(earliestSessionEndTime, latestSessionEndTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFindSessions(final Bytes key,
+                                                                          
final long earliestSessionEndTime,
+                                                                          
final long latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.backwardFindSessions(key, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
keyFrom,
+                                                                  final Bytes 
keyTo,
+                                                                  final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.findSessions(keyFrom, keyTo, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFindSessions(final Bytes keyFrom,
+                                                                          
final Bytes keyTo,
+                                                                          
final long earliestSessionEndTime,
+                                                                          
final long latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public byte[] fetchSession(final Bytes key,
+                               final long sessionStartTime,
+                               final long sessionEndTime) {
+        return 
AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key, 
sessionStartTime, sessionEndTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
+        return new SessionToHeadersIteratorAdapter(store.fetch(key));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes 
key) {
+        return new SessionToHeadersIteratorAdapter(store.backwardFetch(key));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
keyFrom, final Bytes keyTo) {
+        return new SessionToHeadersIteratorAdapter(store.fetch(keyFrom, 
keyTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes 
keyFrom, final Bytes keyTo) {
+        return new 
SessionToHeadersIteratorAdapter(store.backwardFetch(keyFrom, keyTo));
+    }
+
+    @Override
+    public void remove(final Windowed<Bytes> sessionKey) {
+        store.remove(sessionKey);
+    }
+
+    @Override
+    public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
+        store.put(sessionKey, convertToHeaderFormat(aggregate));
+    }
+
+    @Override
+    public String name() {
+        return store.name();
+    }
+
+    @Override
+    public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+        store.init(stateStoreContext, root);
+    }
+
+    @Override
+    public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+        store.commit(changelogOffsets);
+    }
+
+    @Override
+    public void close() {
+        store.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return store.persistent();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return store.isOpen();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        final long start = config.isCollectExecutionInfo() ? System.nanoTime() 
: -1L;
+        final QueryResult<R> result = store.query(query, positionBound, 
config);

Review Comment:
   This is an adaptor store, we need to make a byte-array translation for 
`result`, right?
   
   Cf https://github.com/apache/kafka/pull/21643 and 
https://github.com/apache/kafka/pull/21650



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersIteratorAdapter.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and
+ * {@link org.apache.kafka.streams.state.SessionStore}.
+ * <p>
+ * When iterating over session entries from a store that stores values with 
headers,
+ * this adapter strips the headers prefix so the caller receives raw 
aggregation bytes
+ * without headers.
+ *
+ * @see SessionToHeadersStoreAdapter
+ */
+class SessionToHeadersIteratorAdapter implements 
KeyValueIterator<Windowed<Bytes>, byte[]> {
+    private final KeyValueIterator<Windowed<Bytes>, byte[]> innerIterator;
+
+    SessionToHeadersIteratorAdapter(final KeyValueIterator<Windowed<Bytes>, 
byte[]> innerIterator) {
+        this.innerIterator = innerIterator;
+    }
+
+    @Override
+    public void close() {
+        innerIterator.close();
+    }
+
+    @Override
+    public Windowed<Bytes> peekNextKey() {
+        return innerIterator.peekNextKey();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return innerIterator.hasNext();
+    }
+
+    @Override
+    public KeyValue<Windowed<Bytes>, byte[]> next() {
+        final KeyValue<Windowed<Bytes>, byte[]> keyValue = 
innerIterator.next();
+        return KeyValue.pair(keyValue.key, 
AggregationWithHeadersDeserializer.rawAggregation(keyValue.value));

Review Comment:
   This is an adaptor, not a facade, so the inner store is a "plain store" and 
we need to convert from plain to header format. -- The code does the opposite 
right now what does not seems to be right?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionStore;
+
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and
+ * {@link org.apache.kafka.streams.state.SessionStore}.
+ * <p>
+ * If a user provides a supplier for {@code SessionStore} (without headers) via
+ * {@link org.apache.kafka.streams.kstream.Materialized} when building
+ * a {@code SessionStoreWithHeaders}, this adapter is used to translate between
+ * the raw aggregation {@code byte[]} format and the aggregation-with-headers 
{@code byte[]} format.
+ * <p>
+ * On writes (put), empty headers are prepended to the raw aggregation value 
before
+ * delegating to the inner store.
+ * On reads (get, fetch, findSessions), the headers prefix is stripped from 
the stored value
+ * so the caller receives raw aggregation bytes without headers.
+ *
+ * @see SessionToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class SessionToHeadersStoreAdapter implements SessionStore<Bytes, 
byte[]> {
+    final SessionStore<Bytes, byte[]> store;
+
+    SessionToHeadersStoreAdapter(final SessionStore<Bytes, byte[]> store) {
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        this.store = store;
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
key,
+                                                                  final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.findSessions(key, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionEndTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.findSessions(earliestSessionEndTime, latestSessionEndTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFindSessions(final Bytes key,
+                                                                          
final long earliestSessionEndTime,
+                                                                          
final long latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.backwardFindSessions(key, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
keyFrom,
+                                                                  final Bytes 
keyTo,
+                                                                  final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.findSessions(keyFrom, keyTo, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFindSessions(final Bytes keyFrom,
+                                                                          
final Bytes keyTo,
+                                                                          
final long earliestSessionEndTime,
+                                                                          
final long latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public byte[] fetchSession(final Bytes key,
+                               final long sessionStartTime,
+                               final long sessionEndTime) {
+        return 
AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key, 
sessionStartTime, sessionEndTime));

Review Comment:
   Same -- we need to convert from plain to header format here, right?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionStore;
+
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and
+ * {@link org.apache.kafka.streams.state.SessionStore}.
+ * <p>
+ * If a user provides a supplier for {@code SessionStore} (without headers) via
+ * {@link org.apache.kafka.streams.kstream.Materialized} when building
+ * a {@code SessionStoreWithHeaders}, this adapter is used to translate between
+ * the raw aggregation {@code byte[]} format and the aggregation-with-headers 
{@code byte[]} format.
+ * <p>
+ * On writes (put), empty headers are prepended to the raw aggregation value 
before
+ * delegating to the inner store.
+ * On reads (get, fetch, findSessions), the headers prefix is stripped from 
the stored value
+ * so the caller receives raw aggregation bytes without headers.
+ *
+ * @see SessionToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class SessionToHeadersStoreAdapter implements SessionStore<Bytes, 
byte[]> {
+    final SessionStore<Bytes, byte[]> store;
+
+    SessionToHeadersStoreAdapter(final SessionStore<Bytes, byte[]> store) {
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        this.store = store;
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
key,
+                                                                  final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.findSessions(key, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionEndTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.findSessions(earliestSessionEndTime, latestSessionEndTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFindSessions(final Bytes key,
+                                                                          
final long earliestSessionEndTime,
+                                                                          
final long latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.backwardFindSessions(key, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes 
keyFrom,
+                                                                  final Bytes 
keyTo,
+                                                                  final long 
earliestSessionEndTime,
+                                                                  final long 
latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.findSessions(keyFrom, keyTo, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> 
backwardFindSessions(final Bytes keyFrom,
+                                                                          
final Bytes keyTo,
+                                                                          
final long earliestSessionEndTime,
+                                                                          
final long latestSessionStartTime) {
+        return new SessionToHeadersIteratorAdapter(
+            store.backwardFindSessions(keyFrom, keyTo, earliestSessionEndTime, 
latestSessionStartTime));
+    }
+
+    @Override
+    public byte[] fetchSession(final Bytes key,
+                               final long sessionStartTime,
+                               final long sessionEndTime) {
+        return 
AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key, 
sessionStartTime, sessionEndTime));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
+        return new SessionToHeadersIteratorAdapter(store.fetch(key));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes 
key) {
+        return new SessionToHeadersIteratorAdapter(store.backwardFetch(key));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes 
keyFrom, final Bytes keyTo) {
+        return new SessionToHeadersIteratorAdapter(store.fetch(keyFrom, 
keyTo));
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes 
keyFrom, final Bytes keyTo) {
+        return new 
SessionToHeadersIteratorAdapter(store.backwardFetch(keyFrom, keyTo));
+    }
+
+    @Override
+    public void remove(final Windowed<Bytes> sessionKey) {
+        store.remove(sessionKey);
+    }
+
+    @Override
+    public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {

Review Comment:
   ```suggestion
       public void put(final Windowed<Bytes> sessionKey, final byte[] 
aggregateWithHeader) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to