bbejeck commented on code in PR #21748:
URL: https://github.com/apache/kafka/pull/21748#discussion_r2933049388
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -1599,4 +1264,440 @@ private void
setupAndPopulateKeyValueStoreWithHeaders(final Properties props) th
kafkaStreams.close();
}
+
+ // ==================== Session Store Tests ====================
+
+ @Test
+ public void
shouldMigratePersistentSessionStoreToSessionStoreWithHeadersUsingPapi() throws
Exception {
+ shouldMigrateSessionStoreToSessionStoreWithHeaders(true);
+ }
+
+ @Test
+ public void
shouldMigrateInMemorySessionStoreToSessionStoreWithHeadersUsingPapi() throws
Exception {
+ shouldMigrateSessionStoreToSessionStoreWithHeaders(false);
+ }
+
+ private void shouldMigrateSessionStoreToSessionStoreWithHeaders(final
boolean isPersistent) throws Exception {
+ // Phase 1: Run with plain SessionStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.sessionStoreBuilder(
+ isPersistent ?
Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)) :
+ Stores.inMemorySessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
+ processSessionKeyValueAndVerify("key2", "value2", baseTime + 200);
+ processSessionKeyValueAndVerify("key3", "value3", baseTime + 300);
+
+ kafkaStreams.close(Duration.ofSeconds(5L));
+ kafkaStreams = null;
+
+ // Phase 2: Restart with SessionStoreWithHeaders (headers-aware
supplier)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ final AtomicReference<SessionWithHeadersProcessor> processorRef = new
AtomicReference<>();
+ newBuilder.addStateStore(
+ Stores.sessionStoreBuilderWithHeaders(
+ isPersistent ?
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)) :
+ Stores.inMemorySessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(() -> {
+ final SessionWithHeadersProcessor sessionStore = new
SessionWithHeadersProcessor();
+ processorRef.set(sessionStore);
+ return sessionStore;
+ }, SESSION_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+ // Verify legacy data can be read with empty headers
+ verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100,
processorRef);
+ verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200,
processorRef);
+ verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300,
processorRef);
+
+ // Process new records with headers
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "migration-test".getBytes());
+
+ processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, headers, processorRef);
+ processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime
+ 500, headers, headers, processorRef);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void shouldProxySessionStoreToSessionStoreWithHeaders() throws
Exception {
+ // Phase 1: Run with plain SessionStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.sessionStoreBuilder(
+ Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
+ processSessionKeyValueAndVerify("key2", "value2", baseTime + 200);
+ processSessionKeyValueAndVerify("key3", "value3", baseTime + 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ // Phase 2: Restart with headers-aware builder but non-headers
supplier (proxy/adapter mode)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ final AtomicReference<SessionWithHeadersProcessor> processorRef = new
AtomicReference<>();
+ newBuilder.addStateStore(
+ Stores.sessionStoreBuilderWithHeaders(
+ Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)), // non-headers supplier!
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(() -> {
+ final SessionWithHeadersProcessor p = new
SessionWithHeadersProcessor();
+ processorRef.set(p);
+ return p;
+ }, SESSION_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+ // Verify legacy data can be read with empty headers
+ verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100,
processorRef);
+ verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200,
processorRef);
+ verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300,
processorRef);
+
+ // In proxy mode, headers are stripped when writing to non-headers
store
+ // So we expect empty headers when reading back
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("source", "proxy-test".getBytes());
+ final Headers expectedHeaders = new RecordHeaders();
+
+ processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, expectedHeaders, processorRef);
+ processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime
+ 500, headers, expectedHeaders, processorRef);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void shouldFailDowngradeFromSessionStoreWithHeadersToSessionStore()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateSessionStoreWithHeaders(props);
+ kafkaStreams = null;
+
+ // Attempt to downgrade to plain session store
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.sessionStoreBuilder(
+ Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+ boolean exceptionThrown = false;
+ try {
+ kafkaStreams.start();
+ } catch (final Exception e) {
+ Throwable cause = e;
+ while (cause != null) {
+ if (cause instanceof ProcessorStateException &&
+ cause.getMessage() != null &&
+ cause.getMessage().contains("incompatible settings")) {
+ exceptionThrown = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException
about incompatible settings, but got: " + e.getMessage(), e);
+ }
+ } finally {
+ kafkaStreams.close(Duration.ofSeconds(30L));
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException to be
thrown when attempting to downgrade from headers-aware to plain session store");
+ }
+ }
+
+ @Test
+ public void
shouldSuccessfullyDowngradeFromSessionStoreWithHeadersToSessionStoreAfterCleanup()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateSessionStoreWithHeaders(props);
+
+ kafkaStreams.cleanUp(); // Delete local state
+ kafkaStreams = null;
+
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.sessionStoreBuilder(
+ Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long newTime = CLUSTER.time.milliseconds();
+ processSessionKeyValueAndVerify("key3", "value3", newTime + 300);
+ processSessionKeyValueAndVerify("key4", "value4", newTime + 400);
+
+ kafkaStreams.close();
+ }
+
+ // ==================== Session Store Helper Methods ====================
+
+ private void processSessionKeyValueAndVerify(final String key,
+ final String value,
+ final long timestamp) throws
Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlySessionStore<String, String> store =
+ IntegrationTestUtils.getStore(SESSION_STORE_NAME,
kafkaStreams, QueryableStoreTypes.sessionStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ try (final KeyValueIterator<Windowed<String>, String> iterator
= store.fetch(key)) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>, String> kv =
iterator.next();
+ if (kv.key.key().equals(key) &&
kv.value.equals(value)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } catch (final Exception e) {
+ return false;
+ }
+ }, 60_000L, "Could not verify session value in time.");
+ }
+
+ private void verifySessionValueWithEmptyHeaders(final String key,
+ final String value,
+ final long timestamp,
+ final
AtomicReference<SessionWithHeadersProcessor> processorRef) throws Exception {
+ TestUtils.waitForCondition(() -> {
+ try {
+ if (processorRef.get() == null) {
+ return false;
+ }
+ final ReadOnlySessionStore<String,
AggregationWithHeaders<String>> store = processorRef.get().store();
Review Comment:
The session store integration tests access state stores directly from within
the processor rather than using `IntegrationTestUtils.getStore() / IQ`. This is
necessary because IQ for `SessionStoreWithHeaders` goes through
`ReadOnlySessionStoreFacade → SessionStoreIteratorFacade,` which strips
the `AggregationWithHeaders` wrapper and returns only the raw aggregation
value. Since these tests need to verify that headers are correctly preserved
(or empty
after migration), direct store access is required to inspect the full
`AggregationWithHeaders` object.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]