bbejeck commented on code in PR #21748:
URL: https://github.com/apache/kafka/pull/21748#discussion_r2933056234
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -1599,4 +1264,440 @@ private void
setupAndPopulateKeyValueStoreWithHeaders(final Properties props) th
kafkaStreams.close();
}
+
+ // ==================== Session Store Tests ====================
+
+ @Test
+ public void
shouldMigratePersistentSessionStoreToSessionStoreWithHeadersUsingPapi() throws
Exception {
+ shouldMigrateSessionStoreToSessionStoreWithHeaders(true);
+ }
+
+ @Test
+ public void
shouldMigrateInMemorySessionStoreToSessionStoreWithHeadersUsingPapi() throws
Exception {
+ shouldMigrateSessionStoreToSessionStoreWithHeaders(false);
+ }
+
+ private void shouldMigrateSessionStoreToSessionStoreWithHeaders(final
boolean isPersistent) throws Exception {
+ // Phase 1: Run with plain SessionStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.sessionStoreBuilder(
+ isPersistent ?
Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)) :
+ Stores.inMemorySessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
+ processSessionKeyValueAndVerify("key2", "value2", baseTime + 200);
+ processSessionKeyValueAndVerify("key3", "value3", baseTime + 300);
+
+ kafkaStreams.close(Duration.ofSeconds(5L));
+ kafkaStreams = null;
+
+ // Phase 2: Restart with SessionStoreWithHeaders (headers-aware
supplier)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ final AtomicReference<SessionWithHeadersProcessor> processorRef = new
AtomicReference<>();
+ newBuilder.addStateStore(
+ Stores.sessionStoreBuilderWithHeaders(
+ isPersistent ?
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)) :
+ Stores.inMemorySessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(() -> {
+ final SessionWithHeadersProcessor sessionStore = new
SessionWithHeadersProcessor();
+ processorRef.set(sessionStore);
+ return sessionStore;
+ }, SESSION_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+ // Verify legacy data can be read with empty headers
+ verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100,
processorRef);
+ verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200,
processorRef);
+ verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300,
processorRef);
+
+ // Process new records with headers
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "migration-test".getBytes());
+
+ processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, headers, processorRef);
+ processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime
+ 500, headers, headers, processorRef);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void shouldProxySessionStoreToSessionStoreWithHeaders() throws
Exception {
+ // Phase 1: Run with plain SessionStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.sessionStoreBuilder(
+ Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
+ processSessionKeyValueAndVerify("key2", "value2", baseTime + 200);
+ processSessionKeyValueAndVerify("key3", "value3", baseTime + 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ // Phase 2: Restart with headers-aware builder but non-headers
supplier (proxy/adapter mode)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ final AtomicReference<SessionWithHeadersProcessor> processorRef = new
AtomicReference<>();
+ newBuilder.addStateStore(
+ Stores.sessionStoreBuilderWithHeaders(
+ Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)), // non-headers supplier!
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(() -> {
Review Comment:
`AtomicReference `is used to capture the processor instance created by the
Kafka Streams runtime thread so the test thread can access the store directly.
This is needed because IQ goes through `ReadOnlySessionStoreFacade` which strips
`AggregationWithHeaders`, preventing header verification.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]