aliehsaeedii commented on code in PR #21600:
URL: https://github.com/apache/kafka/pull/21600#discussion_r2877365507


##########
streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java:
##########
@@ -30,6 +31,7 @@ public class DslSessionParams {
     private final String name;
     private final Duration retentionPeriod;
     private final EmitStrategy emitStrategy;
+    private final DslStoreFormat storeFormat;
 
     /**
      * @param name              name of the store (cannot be {@code null})

Review Comment:
   Deprecate this constructor as per KIP-1285.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java:
##########
@@ -920,8 +905,102 @@ public void shouldReduceSessionWindows() throws Exception 
{
         assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, 
t3))), equalTo(KeyValue.pair("stop", t3)));
 
         // verify can query data via IQ
+        if (withHeaders) {
+            verifySessionStoreWithHeaders(userSessionsStore, t1, t2, t3, t4, 
t5);
+        } else {
+            verifySessionStore(userSessionsStore, t1, t3, t4);
+        }
+    }
+
+    private void produceSessionWindowData(final Properties producerConfig,
+                                           final boolean withHeaders,
+                                           final long t1, final long t2, final 
long t3,
+                                           final long t4, final long t5,
+                                           final long sessionGap) throws 
Exception {
+        final List<KeyValue<String, String>> t1Messages = Arrays.asList(
+            new KeyValue<>("bob", "start"),
+            new KeyValue<>("penny", "start"),
+            new KeyValue<>("jo", "pause"),
+            new KeyValue<>("emily", "pause")
+        );
+
+        produceWithOptionalHeaders(t1Messages, producerConfig, withHeaders, 
"t1", t1);
+        produceWithOptionalHeaders(
+            Collections.singletonList(new KeyValue<>("emily", "resume")),
+            producerConfig, withHeaders, "t2", t2);
+        produceWithOptionalHeaders(
+            Arrays.asList(new KeyValue<>("bob", "pause"), new 
KeyValue<>("penny", "stop")),
+            producerConfig, withHeaders, "t3", t3);
+        produceWithOptionalHeaders(
+            Arrays.asList(
+                new KeyValue<>("bob", "resume"),  // bobs session continues
+                new KeyValue<>("jo", "resume")),  // jo's starts new session
+            producerConfig, withHeaders, "t4", t4);
+        produceWithOptionalHeaders(
+            Collections.singletonList(new KeyValue<>("jo", "late")),  // jo 
has late arrival
+            producerConfig, withHeaders, "t5", t5);
+    }
+
+    private void produceWithOptionalHeaders(final Collection<KeyValue<String, 
String>> records,
+                                             final Properties producerConfig,
+                                             final boolean withHeaders,
+                                             final String batchId,
+                                             final long timestamp) throws 
Exception {
+        if (withHeaders) {
+            final Headers headers = new RecordHeaders(Arrays.asList(
+                new RecordHeader("batch", 
batchId.getBytes(StandardCharsets.UTF_8)),
+                new RecordHeader("source", 
"test".getBytes(StandardCharsets.UTF_8))
+            ));
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                userSessionsStream, records, producerConfig, headers, 
timestamp, false);
+        } else {
+            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+                userSessionsStream, records, producerConfig, timestamp);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void verifySessionStoreWithHeaders(final String storeName,
+                                               final long t1, final long t2,
+                                               final long t3, final long t4, 
final long t5) throws Exception {
+        final ReadOnlySessionStore<String, AggregationWithHeaders<String>> 
sessionStore =
+            (ReadOnlySessionStore<String, AggregationWithHeaders<String>>) 
(ReadOnlySessionStore<?, ?>)
+                IntegrationTestUtils.getStore(storeName, kafkaStreams, 
QueryableStoreTypes.sessionStore());
+
+        // bob: [t1,t1] = "start", [t3,t4] = "pause:resume"
+        try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<String>> bob = sessionStore.fetch("bob")) {
+            assertAggregationWithEmptyHeaders(bob.next(), new 
Windowed<>("bob", new SessionWindow(t1, t1)), "start");
+            assertAggregationWithEmptyHeaders(bob.next(), new 
Windowed<>("bob", new SessionWindow(t3, t4)), "pause:resume");
+            assertFalse(bob.hasNext());
+        }
+
+        // emily: [t1,t2] = "pause:resume"
+        try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<String>> emily = sessionStore.fetch("emily")) {
+            assertAggregationWithEmptyHeaders(emily.next(), new 
Windowed<>("emily", new SessionWindow(t1, t2)), "pause:resume");
+            assertFalse(emily.hasNext());
+        }
+
+        // jo: [t1,t1] = "pause", [t5,t4] = "resume:late"
+        try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<String>> jo = sessionStore.fetch("jo")) {
+            assertAggregationWithEmptyHeaders(jo.next(), new Windowed<>("jo", 
new SessionWindow(t1, t1)), "pause");
+            assertAggregationWithEmptyHeaders(jo.next(), new Windowed<>("jo", 
new SessionWindow(t5, t4)), "resume:late");
+            assertFalse(jo.hasNext());
+        }
+    }
+
+    private static <V> void assertAggregationWithEmptyHeaders(

Review Comment:
   No tests verify actual header propagation through the session store.
   



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -381,17 +383,20 @@ public String[] storeNames() {
 
     private class KTableSessionWindowValueGetter implements 
KTableValueGetter<Windowed<KIn>, VAgg> {
 
-        private SessionStore<KIn, VAgg> store;
+        private SessionStore<KIn, AggregationWithHeaders<VAgg>> store;
 
+        @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext<?, ?> context) {
             store = context.getStateStore(storeName);
         }
 
         @Override
         public ValueAndTimestamp<VAgg> get(final Windowed<KIn> key) {

Review Comment:
   Just FYI: `get` must return `ValueTimestampHeaders<VAgg>`, since the 
`KTableValueGetter.get` has changed. (we can not apply it in this PR maybe, 
since the other PR is not merged)



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java:
##########
@@ -440,4 +442,15 @@ private void processData(final TopologyTestDriver driver) {
         inputTopic.pipeInput("2", "1", 600);
         inputTopic.pipeInput("2", "2", 599);
     }
+
+    private <V> List<KeyValue<Windowed<String>, V>> unwrapAggregations(
+            final KeyValueIterator<Windowed<String>, V> iterator) {
+        final List<KeyValue<Windowed<String>, V>> result = new ArrayList<>();
+        while (iterator.hasNext()) {
+            final KeyValue<Windowed<String>, V> next = iterator.next();
+            result.add(KeyValue.pair(next.key, next.value));
+        }
+        iterator.close();
+        return result;
+    }

Review Comment:
   ditto.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java:
##########
@@ -58,18 +59,20 @@ public SessionStoreMaterializer(
 
     @Override
     public StoreBuilder<?> builder() {
+        final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.PLAIN : DslStoreFormat.HEADERS;
         final SessionBytesStoreSupplier supplier = 
materialized.storeSupplier() == null
                 ? dslStoreSuppliers().sessionStore(new DslSessionParams(
                         materialized.storeName(),
                         Duration.ofMillis(retentionPeriod),
-                        emitStrategy))
+                        emitStrategy,
+                        storeFormat))
                 : (SessionBytesStoreSupplier) materialized.storeSupplier();
 
-        final StoreBuilder<SessionStore<K, V>> builder = 
Stores.sessionStoreBuilder(
-                supplier,
-                materialized.keySerde(),
-                materialized.valueSerde()
-        );
+        final StoreBuilder<?> builder = Stores.sessionStoreBuilderWithHeaders(

Review Comment:
   `?` could stay `SessionStore` since `SessionStoreWithHeaders` is a 
`SessionStore` too?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.SessionStore;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.SessionStoreWithHeaders} and
+ * {@link org.apache.kafka.streams.state.SessionStore}.
+ * <p>
+ * If a user provides a supplier for {@code SessionStore} (without headers) via
+ * {@link org.apache.kafka.streams.kstream.Materialized} when building
+ * a {@code SessionStoreWithHeaders}, this adapter is used to translate between
+ * the raw aggregation {@code byte[]} format and the aggregation-with-headers 
{@code byte[]} format.
+ * <p>
+ * On writes (put), empty headers are prepended to the raw aggregation value 
before
+ * delegating to the inner store.
+ * On reads (get, fetch, findSessions), the headers prefix is stripped from 
the stored value
+ * so the caller receives raw aggregation bytes without headers.
+ *
+ * @see SessionToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class SessionToHeadersStoreAdapter implements SessionStore<Bytes, 
byte[]> {
+    final SessionStore<Bytes, byte[]> store;
+
+    SessionToHeadersStoreAdapter(final SessionStore<Bytes, byte[]> store) {
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        this.store = store;
+    }
+
+    /**
+     * Extract raw aggregation value from serialized AggregationWithHeaders.
+     * This strips the headers portion and returns only the aggregation bytes.
+     * <p>
+     * Format conversion:
+     * Input:  [headersSize(varint)][headers][aggregationBytes]
+     * Output: [aggregationBytes]
+     */
+    static byte[] rawAggregationValue(final byte[] valueWithHeaders) {

Review Comment:
   Is n't this method exactly the same as 
`AggregationWithHeadersDeserializer.rawAggregation(aggregationWithHeaders)`?



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java:
##########
@@ -289,18 +293,22 @@ public void 
shouldRemoveMergedSessionsFromStateStore(final EmitStrategy.Strategy
         processor.process(new Record<>("a", "1", 0L));
 
         // first ensure it is in the store
-        try (final KeyValueIterator<Windowed<String>, Long> a1 =
+        try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<Long>> a1 =
                  sessionStore.findSessions("a", 0, 0)) {
-            assertEquals(KeyValue.pair(new Windowed<>("a", new 
SessionWindow(0, 0)), 1L), a1.next());
+            final KeyValue<Windowed<String>, AggregationWithHeaders<Long>> 
next = a1.next();
+            assertEquals(new Windowed<>("a", new SessionWindow(0, 0)), 
next.key);
+            assertEquals(1L, 
AggregationWithHeaders.getAggregationOrNull(next.value));
         }
 
 
         processor.process(new Record<>("a", "2", 100L));
         // a1 from above should have been removed
         // should have merged session in store
-        try (final KeyValueIterator<Windowed<String>, Long> a2 =
+        try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<Long>> a2 =
                  sessionStore.findSessions("a", 0, 100)) {
-            assertEquals(KeyValue.pair(new Windowed<>("a", new 
SessionWindow(0, 100)), 2L), a2.next());
+            final KeyValue<Windowed<String>, AggregationWithHeaders<Long>> 
next = a2.next();
+            assertEquals(new Windowed<>("a", new SessionWindow(0, 100)), 
next.key);
+            assertEquals(2L, 
AggregationWithHeaders.getAggregationOrNull(next.value));
             assertFalse(a2.hasNext());
         }
     }

Review Comment:
   Beside fixing existing tests, should we add more tests that covers new 
situation as well? When input data has (non-empty) headers for example. But we 
can merge this PR and open a new PR for that



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java:
##########
@@ -58,18 +59,20 @@ public SessionStoreMaterializer(
 
     @Override
     public StoreBuilder<?> builder() {
+        final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.PLAIN : DslStoreFormat.HEADERS;

Review Comment:
   what if `dslStoreFormat()==DEFAULT`? should it be 
   ```
   final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.PLAIN : dslStoreFormat();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to