mjsax commented on code in PR #21639:
URL: https://github.com/apache/kafka/pull/21639#discussion_r2944096477
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -133,6 +135,44 @@ public ValueTimestampHeaders<V> putIfAbsent(final K key,
return currentValue;
}
+ @Override
+ public ValueTimestampHeaders<V> delete(final K key) {
+ Objects.requireNonNull(key, "key cannot be null");
+ try {
+ return maybeMeasureLatency(
+ () -> {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Create new headers object to isolate delete operation
from input record
+ final Headers newHeaders = new
RecordHeaders(currentContext.headers());
Review Comment:
```suggestion
final Headers deletedHeaders = new
RecordHeaders(currentContext.headers());
```
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -81,6 +82,46 @@ public void put(final Windowed<K> sessionKey, final
AggregationWithHeaders<AGG>
}
+ @Override
+ public void remove(final Windowed<K> sessionKey) {
+ Objects.requireNonNull(sessionKey, "sessionKey can't be null");
+ Objects.requireNonNull(sessionKey.key(), "sessionKey.key() can't be
null");
+ Objects.requireNonNull(sessionKey.window(), "sessionKey.window() can't
be null");
+
+ try {
+ maybeMeasureLatency(
+ () -> {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Create new headers object to isolate delete operation
from input record
+ final Headers newHeaders = new
RecordHeaders(currentContext.headers());
Review Comment:
nit: should we rename to `deleteHeaders` ?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java:
##########
@@ -81,6 +82,46 @@ public void put(final Windowed<K> sessionKey, final
AggregationWithHeaders<AGG>
}
+ @Override
+ public void remove(final Windowed<K> sessionKey) {
Review Comment:
We also need an overwrite for `put(sessionKey, aggregate)` to handle
`aggregate == null` case is a similar way.
##########
streams/src/test/java/org/apache/kafka/streams/integration/SessionStoreWithHeadersIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Integration test to verify that key serializers can modify headers as a
side-effect,
+ * and that this side-effect makes it into the changelog topic for session
stores.
+ *
+ * This test verifies the core assumption of the headers-aware state store
implementation:
+ * when we create a temporary context with new headers and serialize the key,
the key
+ * serializer will add metadata to those headers, and those headers
+ * will be used when logging the change to the changelog topic.
+ */
+public class SessionStoreWithHeadersIntegrationTest {
+
+ private static final String STORE_NAME = "test-session-store";
+ private static final String INPUT_TOPIC = "input";
+
+ /**
+ * Custom serializer that adds a header during serialization as a
side-effect.
+ * This simulates real-world serializers that add metadata to headers.
+ */
+ private static class HeaderAddingSerializer implements Serializer<String> {
+ @Override
+ public byte[] serialize(final String topic, final String data) {
+ return data == null ? null : data.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers,
final String data) {
+ headers.add("serializer-metadata",
"session-test-value".getBytes(StandardCharsets.UTF_8));
+ return serialize(topic, data);
+ }
+ }
+
+ private static class HeaderAddingSerde implements Serde<String> {
+ @Override
+ public Serializer<String> serializer() {
+ return new HeaderAddingSerializer();
+ }
+
+ @Override
+ public Deserializer<String> deserializer() {
+ return Serdes.String().deserializer();
+ }
+ }
+
+ /**
+ * Processor that puts and removes from a session store with headers.
+ */
+ private static class SessionStoreProcessor extends
ContextualProcessor<String, String, Void, Void> {
+ private SessionStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ super.init(context);
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long timestamp = record.timestamp();
+ final Windowed<String> sessionKey = new Windowed<>(
+ record.key(),
+ new SessionWindow(timestamp, timestamp)
+ );
+
+ if (record.value() == null) {
+ // Delete using remove()
+ store.remove(sessionKey);
+ } else {
+ // Put with headers
+ store.put(
+ sessionKey,
+ AggregationWithHeaders.make(record.value(),
record.headers())
+ );
+ }
+ }
+ }
+
+ @Test
+ public void shouldPropagateSerializerHeaderSideEffectToChangelog() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ // Create a session store with headers using our custom serializer
+ builder.addStateStore(
+ Stores.sessionStoreBuilderWithHeaders(
+ Stores.persistentSessionStore(
Review Comment:
```suggestion
Stores.inMemorySessionStore(
```
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java:
##########
@@ -81,10 +83,37 @@ protected Serde<ValueTimestampHeaders<V>>
prepareValueSerde(final Serde<ValueTim
@Override
public void put(final K key, final ValueTimestampHeaders<V> value, final
long windowStartTimestamp) {
Objects.requireNonNull(key, "key cannot be null");
- final Headers headers = value == null || value.headers() == null ? new
RecordHeaders() : value.headers();
try {
maybeMeasureLatency(
- () -> wrapped().put(keyBytes(key, headers),
serdes.rawValue(value, headers), windowStartTimestamp),
+ () -> {
+ if (value == null) {
+ // Deletion path
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Create new headers object to isolate delete
operation from input record
+ final Headers newHeaders = new
RecordHeaders(currentContext.headers());
Review Comment:
```suggestion
final Headers deleteHeaders = new
RecordHeaders(currentContext.headers());
```
##########
streams/src/test/java/org/apache/kafka/streams/integration/SessionStoreWithHeadersIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
Review Comment:
I agree with Bill -- this is not an integration test, but still more like a
unit test, as it's using TDD. I think this test should just do into
`o.a.k.streams.state.internals` package?
But wondering if we actually need a full new TDD test, or if we should just
extend existing `MeteredSessionStoreWithHeadersTest`? Not saying we have to
extend `MeteredSessionStoreWithHeadersTest` -- might require too much mocking...
##########
streams/src/test/java/org/apache/kafka/streams/integration/SessionStoreWithHeadersIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Integration test to verify that key serializers can modify headers as a
side-effect,
+ * and that this side-effect makes it into the changelog topic for session
stores.
+ *
+ * This test verifies the core assumption of the headers-aware state store
implementation:
+ * when we create a temporary context with new headers and serialize the key,
the key
+ * serializer will add metadata to those headers, and those headers
+ * will be used when logging the change to the changelog topic.
+ */
+public class SessionStoreWithHeadersIntegrationTest {
+
+ private static final String STORE_NAME = "test-session-store";
+ private static final String INPUT_TOPIC = "input";
+
+ /**
+ * Custom serializer that adds a header during serialization as a
side-effect.
+ * This simulates real-world serializers that add metadata to headers.
+ */
+ private static class HeaderAddingSerializer implements Serializer<String> {
+ @Override
+ public byte[] serialize(final String topic, final String data) {
+ return data == null ? null : data.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers,
final String data) {
+ headers.add("serializer-metadata",
"session-test-value".getBytes(StandardCharsets.UTF_8));
+ return serialize(topic, data);
+ }
+ }
+
+ private static class HeaderAddingSerde implements Serde<String> {
+ @Override
+ public Serializer<String> serializer() {
+ return new HeaderAddingSerializer();
+ }
+
+ @Override
+ public Deserializer<String> deserializer() {
+ return Serdes.String().deserializer();
+ }
+ }
+
+ /**
+ * Processor that puts and removes from a session store with headers.
+ */
+ private static class SessionStoreProcessor extends
ContextualProcessor<String, String, Void, Void> {
+ private SessionStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ super.init(context);
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long timestamp = record.timestamp();
+ final Windowed<String> sessionKey = new Windowed<>(
+ record.key(),
+ new SessionWindow(timestamp, timestamp)
+ );
+
+ if (record.value() == null) {
+ // Delete using remove()
+ store.remove(sessionKey);
+ } else {
+ // Put with headers
+ store.put(
+ sessionKey,
+ AggregationWithHeaders.make(record.value(),
record.headers())
+ );
+ }
+ }
+ }
+
+ @Test
+ public void shouldPropagateSerializerHeaderSideEffectToChangelog() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ // Create a session store with headers using our custom serializer
+ builder.addStateStore(
+ Stores.sessionStoreBuilderWithHeaders(
+ Stores.persistentSessionStore(
+ STORE_NAME,
+ Duration.ofMillis(10000L)
+ ),
+ new HeaderAddingSerde(), // Custom key serializer that adds
headers
+ Serdes.String()
+ )
+ );
+
+ // Add a processor that uses the store
+ builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionStoreProcessor::new, STORE_NAME);
+
+ final Properties props = new Properties();
+ props.put("application.id", "test-session-app");
+ props.put("bootstrap.servers", "dummy:1234");
Review Comment:
TDD does actually not require to set `application.id` or `bootstrap.server`
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -133,6 +135,44 @@ public ValueTimestampHeaders<V> putIfAbsent(final K key,
return currentValue;
}
+ @Override
+ public ValueTimestampHeaders<V> delete(final K key) {
Review Comment:
We also need to override `put`, `putIfAbsent`, and `putAll` which can also
be used to delete via `value == null`.
##########
streams/src/test/java/org/apache/kafka/streams/integration/SessionStoreWithHeadersIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Integration test to verify that key serializers can modify headers as a
side-effect,
+ * and that this side-effect makes it into the changelog topic for session
stores.
+ *
+ * This test verifies the core assumption of the headers-aware state store
implementation:
+ * when we create a temporary context with new headers and serialize the key,
the key
+ * serializer will add metadata to those headers, and those headers
+ * will be used when logging the change to the changelog topic.
+ */
+public class SessionStoreWithHeadersIntegrationTest {
+
+ private static final String STORE_NAME = "test-session-store";
+ private static final String INPUT_TOPIC = "input";
+
+ /**
+ * Custom serializer that adds a header during serialization as a
side-effect.
+ * This simulates real-world serializers that add metadata to headers.
+ */
+ private static class HeaderAddingSerializer implements Serializer<String> {
+ @Override
+ public byte[] serialize(final String topic, final String data) {
+ return data == null ? null : data.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers,
final String data) {
+ headers.add("serializer-metadata",
"session-test-value".getBytes(StandardCharsets.UTF_8));
+ return serialize(topic, data);
+ }
+ }
+
+ private static class HeaderAddingSerde implements Serde<String> {
+ @Override
+ public Serializer<String> serializer() {
+ return new HeaderAddingSerializer();
+ }
+
+ @Override
+ public Deserializer<String> deserializer() {
+ return Serdes.String().deserializer();
+ }
+ }
+
+ /**
+ * Processor that puts and removes from a session store with headers.
+ */
+ private static class SessionStoreProcessor extends
ContextualProcessor<String, String, Void, Void> {
+ private SessionStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ super.init(context);
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long timestamp = record.timestamp();
+ final Windowed<String> sessionKey = new Windowed<>(
+ record.key(),
+ new SessionWindow(timestamp, timestamp)
+ );
+
+ if (record.value() == null) {
+ // Delete using remove()
+ store.remove(sessionKey);
+ } else {
+ // Put with headers
+ store.put(
+ sessionKey,
+ AggregationWithHeaders.make(record.value(),
record.headers())
+ );
+ }
+ }
+ }
+
+ @Test
+ public void shouldPropagateSerializerHeaderSideEffectToChangelog() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ // Create a session store with headers using our custom serializer
+ builder.addStateStore(
+ Stores.sessionStoreBuilderWithHeaders(
+ Stores.persistentSessionStore(
+ STORE_NAME,
+ Duration.ofMillis(10000L)
+ ),
+ new HeaderAddingSerde(), // Custom key serializer that adds
headers
+ Serdes.String()
+ )
+ );
+
+ // Add a processor that uses the store
+ builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionStoreProcessor::new, STORE_NAME);
+
+ final Properties props = new Properties();
+ props.put("application.id", "test-session-app");
+ props.put("bootstrap.servers", "dummy:1234");
+ props.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
Review Comment:
If we use in-memory store, we can also remove this one.
##########
streams/src/test/java/org/apache/kafka/streams/integration/SessionStoreWithHeadersIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Integration test to verify that key serializers can modify headers as a
side-effect,
+ * and that this side-effect makes it into the changelog topic for session
stores.
+ *
+ * This test verifies the core assumption of the headers-aware state store
implementation:
+ * when we create a temporary context with new headers and serialize the key,
the key
+ * serializer will add metadata to those headers, and those headers
+ * will be used when logging the change to the changelog topic.
+ */
+public class SessionStoreWithHeadersIntegrationTest {
+
+ private static final String STORE_NAME = "test-session-store";
+ private static final String INPUT_TOPIC = "input";
+
+ /**
+ * Custom serializer that adds a header during serialization as a
side-effect.
+ * This simulates real-world serializers that add metadata to headers.
+ */
+ private static class HeaderAddingSerializer implements Serializer<String> {
+ @Override
+ public byte[] serialize(final String topic, final String data) {
+ return data == null ? null : data.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers,
final String data) {
+ headers.add("serializer-metadata",
"session-test-value".getBytes(StandardCharsets.UTF_8));
+ return serialize(topic, data);
+ }
+ }
+
+ private static class HeaderAddingSerde implements Serde<String> {
+ @Override
+ public Serializer<String> serializer() {
+ return new HeaderAddingSerializer();
+ }
+
+ @Override
+ public Deserializer<String> deserializer() {
+ return Serdes.String().deserializer();
+ }
+ }
+
+ /**
+ * Processor that puts and removes from a session store with headers.
+ */
+ private static class SessionStoreProcessor extends
ContextualProcessor<String, String, Void, Void> {
+ private SessionStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ super.init(context);
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long timestamp = record.timestamp();
+ final Windowed<String> sessionKey = new Windowed<>(
+ record.key(),
+ new SessionWindow(timestamp, timestamp)
+ );
+
+ if (record.value() == null) {
Review Comment:
Can we do this different, to be able to test both `remove` and `put(key,
null)` ?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java:
##########
@@ -97,4 +97,11 @@ public void putAll(final List<KeyValue<Bytes, byte[]>>
entries) {
);
}
}
+
+ @Override
+ public byte[] delete(final Bytes key) {
+ final byte[] deletedValue = wrapped().delete(key);
+ log(key, null, internalContext.recordContext().timestamp(),
internalContext.recordContext().headers());
Review Comment:
Yes, it will return the "temp header" we created and set inside metered
store.
##########
streams/src/test/java/org/apache/kafka/streams/integration/SessionStoreWithHeadersIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Integration test to verify that key serializers can modify headers as a
side-effect,
+ * and that this side-effect makes it into the changelog topic for session
stores.
+ *
+ * This test verifies the core assumption of the headers-aware state store
implementation:
+ * when we create a temporary context with new headers and serialize the key,
the key
+ * serializer will add metadata to those headers, and those headers
+ * will be used when logging the change to the changelog topic.
+ */
+public class SessionStoreWithHeadersIntegrationTest {
+
+ private static final String STORE_NAME = "test-session-store";
+ private static final String INPUT_TOPIC = "input";
+
+ /**
+ * Custom serializer that adds a header during serialization as a
side-effect.
+ * This simulates real-world serializers that add metadata to headers.
+ */
+ private static class HeaderAddingSerializer implements Serializer<String> {
+ @Override
+ public byte[] serialize(final String topic, final String data) {
+ return data == null ? null : data.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers,
final String data) {
+ headers.add("serializer-metadata",
"session-test-value".getBytes(StandardCharsets.UTF_8));
+ return serialize(topic, data);
+ }
+ }
+
+ private static class HeaderAddingSerde implements Serde<String> {
+ @Override
+ public Serializer<String> serializer() {
+ return new HeaderAddingSerializer();
+ }
+
+ @Override
+ public Deserializer<String> deserializer() {
+ return Serdes.String().deserializer();
+ }
+ }
+
+ /**
+ * Processor that puts and removes from a session store with headers.
+ */
+ private static class SessionStoreProcessor extends
ContextualProcessor<String, String, Void, Void> {
+ private SessionStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ super.init(context);
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long timestamp = record.timestamp();
+ final Windowed<String> sessionKey = new Windowed<>(
+ record.key(),
+ new SessionWindow(timestamp, timestamp)
+ );
+
+ if (record.value() == null) {
+ // Delete using remove()
+ store.remove(sessionKey);
+ } else {
+ // Put with headers
+ store.put(
+ sessionKey,
+ AggregationWithHeaders.make(record.value(),
record.headers())
+ );
+ }
+ }
+ }
+
+ @Test
+ public void shouldPropagateSerializerHeaderSideEffectToChangelog() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ // Create a session store with headers using our custom serializer
+ builder.addStateStore(
+ Stores.sessionStoreBuilderWithHeaders(
+ Stores.persistentSessionStore(
+ STORE_NAME,
+ Duration.ofMillis(10000L)
+ ),
+ new HeaderAddingSerde(), // Custom key serializer that adds
headers
+ Serdes.String()
+ )
+ );
+
+ // Add a processor that uses the store
+ builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionStoreProcessor::new, STORE_NAME);
Review Comment:
I think we should forward the record to result topic, to allow us to verify
that the output record does not contain the header set by the store
key-serializer (of course we will need to use regular `StringSerde` on the
result topic.
##########
streams/src/test/java/org/apache/kafka/streams/integration/SessionStoreWithHeadersIntegrationTest.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Integration test to verify that key serializers can modify headers as a
side-effect,
+ * and that this side-effect makes it into the changelog topic for session
stores.
+ *
+ * This test verifies the core assumption of the headers-aware state store
implementation:
+ * when we create a temporary context with new headers and serialize the key,
the key
+ * serializer will add metadata to those headers, and those headers
+ * will be used when logging the change to the changelog topic.
+ */
+public class SessionStoreWithHeadersIntegrationTest {
+
+ private static final String STORE_NAME = "test-session-store";
+ private static final String INPUT_TOPIC = "input";
+
+ /**
+ * Custom serializer that adds a header during serialization as a
side-effect.
+ * This simulates real-world serializers that add metadata to headers.
+ */
+ private static class HeaderAddingSerializer implements Serializer<String> {
+ @Override
+ public byte[] serialize(final String topic, final String data) {
+ return data == null ? null : data.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers,
final String data) {
+ headers.add("serializer-metadata",
"session-test-value".getBytes(StandardCharsets.UTF_8));
+ return serialize(topic, data);
+ }
+ }
+
+ private static class HeaderAddingSerde implements Serde<String> {
+ @Override
+ public Serializer<String> serializer() {
+ return new HeaderAddingSerializer();
+ }
+
+ @Override
+ public Deserializer<String> deserializer() {
+ return Serdes.String().deserializer();
+ }
+ }
+
+ /**
+ * Processor that puts and removes from a session store with headers.
+ */
+ private static class SessionStoreProcessor extends
ContextualProcessor<String, String, Void, Void> {
+ private SessionStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ super.init(context);
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long timestamp = record.timestamp();
+ final Windowed<String> sessionKey = new Windowed<>(
+ record.key(),
+ new SessionWindow(timestamp, timestamp)
+ );
+
+ if (record.value() == null) {
+ // Delete using remove()
+ store.remove(sessionKey);
+ } else {
+ // Put with headers
+ store.put(
+ sessionKey,
+ AggregationWithHeaders.make(record.value(),
record.headers())
+ );
+ }
+ }
+ }
+
+ @Test
+ public void shouldPropagateSerializerHeaderSideEffectToChangelog() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ // Create a session store with headers using our custom serializer
+ builder.addStateStore(
+ Stores.sessionStoreBuilderWithHeaders(
+ Stores.persistentSessionStore(
+ STORE_NAME,
+ Duration.ofMillis(10000L)
+ ),
+ new HeaderAddingSerde(), // Custom key serializer that adds
headers
+ Serdes.String()
+ )
+ );
+
+ // Add a processor that uses the store
+ builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionStoreProcessor::new, STORE_NAME);
+
+ final Properties props = new Properties();
+ props.put("application.id", "test-session-app");
+ props.put("bootstrap.servers", "dummy:1234");
+ props.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
+
+ try (TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(
+ INPUT_TOPIC,
+ Serdes.String().serializer(),
+ Serdes.String().serializer()
+ );
+
+ final String changelogTopic = "test-session-app-" + STORE_NAME +
"-changelog";
+ final TestOutputTopic<String, String> changelogOutputTopic =
+ driver.createOutputTopic(
+ changelogTopic,
+ Serdes.String().deserializer(),
+ Serdes.String().deserializer()
+ );
+
+ inputTopic.pipeInput("key1", "value1", 1000L);
+
+ // Verify changelog has the put record with header
+ final var putRecord = changelogOutputTopic.readRecord();
+ assertNotNull(putRecord.key());
+ assertEquals("value1", putRecord.value());
+
+ // Verify the serializer added metadata header as side-effect
+ final Header putMetadataHeader =
putRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(putMetadataHeader, "metadata header should be
present in put record");
+ assertEquals("session-test-value", new
String(putMetadataHeader.value(), StandardCharsets.UTF_8));
+
+ inputTopic.pipeInput("key1", (String) null, 1000L);
Review Comment:
I think we should use "commend" value like `"remove"` and `"put(null)" or
something like this to mimic both cases we want to test inside the Processor
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]