mjsax commented on code in PR #21666:
URL: https://github.com/apache/kafka/pull/21666#discussion_r2902521952


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -100,55 +132,43 @@ private void openInUpgradeMode(final DBOptions dbOptions,
             new 
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME, 
columnFamilyOptions)
         );
 
-        verifyAndCloseEmptyDefaultColumnFamily(columnFamilies.get(0));
+        // verify and close empty Default ColumnFamily

Review Comment:
   Where do we close default-CF on the happy path?



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -233,6 +234,128 @@ public void 
shouldProxyTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHea
         kafkaStreams.close();
     }
 
+    @Test
+    public void 
shouldMigrateInMemoryPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
 throws Exception {
+        
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(false);
+    }
+
+    @Test
+    public void 
shouldMigratePersistentPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
 throws Exception {
+        
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(true);
+    }
+
+    private void 
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(final
 boolean persistentStore) throws Exception {
+        final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+        streamsBuilderForOldStore.addStateStore(
+                Stores.keyValueStoreBuilder(
+                    persistentStore ? 
Stores.persistentKeyValueStore(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(KeyValueProcessor::new, STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), 
props);
+        kafkaStreams.start();
+
+        processKeyValueAndVerifyValue("key1", "value1");
+        processKeyValueAndVerifyValue("key2", "value2");
+        processKeyValueAndVerifyValue("key3", "value3");
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+        streamsBuilderForNewStore.addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    persistentStore ? 
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), 
props);
+        kafkaStreams.start();
+
+        if (persistentStore) {
+            // Verify legacy data can be read with empty headers and timestamp 
= -1
+            verifyLegacyValuesWithEmptyHeaders("key1", "value1", -1);
+            verifyLegacyValuesWithEmptyHeaders("key2", "value2", -1);
+            verifyLegacyValuesWithEmptyHeaders("key3", "value3", -1);
+        } else {
+            // Verify legacy data can be read with empty headers.
+            // When data is read from the changelog, the timestamp is set to 
record.timestamp.
+            verifyLegacyValuesWithEmptyHeaders("key1", "value1");

Review Comment:
   We could still verify which ts was used on-write,  using 
`CLUSTER.time.milliseconds()` -- compare 
TimestampedStoreUpgradeIntegrationTest#shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi`
 for details how to do this.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -71,27 +71,59 @@ public RocksDBTimestampedStoreWithHeaders(final String name,
     @Override
     void openRocksDB(final DBOptions dbOptions,
                      final ColumnFamilyOptions columnFamilyOptions) {
-        // Check if we're upgrading from RocksDBTimestampedStore (which uses 
keyValueWithTimestamp CF)
+        // Check if we're upgrading from RocksDBTimestampedStore or from plain 
RocksDBStore
         final List<byte[]> existingCFs;
         try (final Options options = new Options(dbOptions, new 
ColumnFamilyOptions())) {
             existingCFs = RocksDB.listColumnFamilies(options, 
dbDir.getAbsolutePath());
         } catch (final RocksDBException e) {
             throw new ProcessorStateException("Error listing column families 
for store " + name, e);
         }
 
-
-        final boolean upgradingFromTimestampedStore = existingCFs.stream()
+        final boolean hasTimestampedCF = existingCFs.stream()
             .anyMatch(cf -> Arrays.equals(cf, LEGACY_TIMESTAMPED_CF_NAME));
 
-        if (upgradingFromTimestampedStore) {
-            openInUpgradeMode(dbOptions, columnFamilyOptions);
+        if (hasTimestampedCF) {
+            // Upgrading from timestamped store - use 2 CFs: 
LEGACY_TIMESTAMPED + HEADERS
+            openFromTimestampedStore(dbOptions, columnFamilyOptions); // here  
check that default has no data

Review Comment:
   ```suggestion
               openFromTimestampedStore(dbOptions, columnFamilyOptions); // 
needs to check that default-CF has no data
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -71,27 +71,59 @@ public RocksDBTimestampedStoreWithHeaders(final String name,
     @Override
     void openRocksDB(final DBOptions dbOptions,
                      final ColumnFamilyOptions columnFamilyOptions) {
-        // Check if we're upgrading from RocksDBTimestampedStore (which uses 
keyValueWithTimestamp CF)
+        // Check if we're upgrading from RocksDBTimestampedStore or from plain 
RocksDBStore
         final List<byte[]> existingCFs;
         try (final Options options = new Options(dbOptions, new 
ColumnFamilyOptions())) {
             existingCFs = RocksDB.listColumnFamilies(options, 
dbDir.getAbsolutePath());
         } catch (final RocksDBException e) {
             throw new ProcessorStateException("Error listing column families 
for store " + name, e);
         }
 
-
-        final boolean upgradingFromTimestampedStore = existingCFs.stream()
+        final boolean hasTimestampedCF = existingCFs.stream()
             .anyMatch(cf -> Arrays.equals(cf, LEGACY_TIMESTAMPED_CF_NAME));
 
-        if (upgradingFromTimestampedStore) {
-            openInUpgradeMode(dbOptions, columnFamilyOptions);
+        if (hasTimestampedCF) {
+            // Upgrading from timestamped store - use 2 CFs: 
LEGACY_TIMESTAMPED + HEADERS
+            openFromTimestampedStore(dbOptions, columnFamilyOptions); // here  
check that default has no data
         } else {
-            openInRegularMode(dbOptions, columnFamilyOptions);
+            openFromDefaultStore(dbOptions, columnFamilyOptions);
+        }
+
+    }
+
+    private void openFromDefaultStore(final DBOptions dbOptions,
+                                              final ColumnFamilyOptions 
columnFamilyOptions) {

Review Comment:
   nit: indention



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link TimestampedKeyValueStoreWithHeaders} and plain {@link KeyValueStore}.
+ * <p>
+ * If a user provides a supplier for plain {@code KeyValueStore} (without 
timestamp or headers) via
+ * {@link Materialized#as(KeyValueBytesStoreSupplier)} when building
+ * a {@code TimestampedKeyValueStoreWithHeaders}, this adapter is used to 
translate between
+ * the plain {@code byte[]} format and the timestamped-with-headers {@code 
byte[]} format.
+ *
+ * @see PlainToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class PlainToHeadersStoreAdapter implements KeyValueStore<Bytes, 
byte[]> {
+    final KeyValueStore<Bytes, byte[]> store;
+
+    PlainToHeadersStoreAdapter(final KeyValueStore<Bytes, byte[]> store) {
+        if (!store.persistent()) {
+            throw new IllegalArgumentException("Provided store must be a 
persistent store, but it is not.");
+        }
+        if (store instanceof TimestampedBytesStore) {
+            throw new IllegalArgumentException("Provided store must be a plain 
(non-timestamped) key value store, but it is timestamped.");
+        }
+        this.store = store;
+    }
+
+    /**
+     * Extract raw plain value from serialized ValueTimestampHeaders.
+     * This strips both the headers and timestamp portions.
+     *
+     * Format conversion:
+     * Input:  [headersSize(varint)][headers][timestamp(8)][value]
+     * Output: [value]
+     */
+    static byte[] rawPlainValue(final byte[] rawValueTimestampHeaders) {

Review Comment:
   Should we follow https://github.com/apache/kafka/pull/21626 and add 
`UtilsTest` class in this PR -- to avoid piling up more stuff we need to 
refactor?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java:
##########
@@ -559,6 +852,60 @@ private void verifyInHeadersAwareMode() {
         }
     }
 
+    @Test
+    public void shouldTransitionFromPlainUpgradeModeToRegularMode() throws 
Exception {
+        // Prepare plain store
+        prepareKeyValueStore();
+
+        // Open in upgrade mode
+        rocksDBStore.init(context, rocksDBStore);
+
+        // Migrate all data by reading it
+        rocksDBStore.get(new Bytes("key1".getBytes()));
+        rocksDBStore.get(new Bytes("key2".getBytes()));
+
+        rocksDBStore.close();
+
+        // Clear the default CF manually

Review Comment:
   Why do we need this? The two `get()` from above should have migrated both 
rows already?



##########
streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java:
##########
@@ -64,4 +64,69 @@ public void shouldConvertEmptyValueToHeaderFormat() {
         assertEquals(0, headersSize, "Empty headers should have headersSize = 
0");
         assertEquals(0, buffer.remaining(), "No payload bytes for empty 
value");
     }
+
+    @Test
+    public void shouldReturnNullWhenConvertingNullPlainValue() {
+        final byte[] result = 
HeadersBytesStore.convertFromPlainToHeaderFormat(null);
+        assertNull(result);
+    }
+
+    @Test
+    public void shouldConvertPlainValueToHeaderFormatWithTimestamp() {
+        final byte[] plainValue = "test-value".getBytes();
+
+        final byte[] converted = 
HeadersBytesStore.convertFromPlainToHeaderFormat(plainValue);
+
+        assertNotNull(converted);
+        // Expected format: [0x00 (1 byte)][timestamp=-1 (8 bytes)][value]
+        assertEquals(1 + 8 + plainValue.length, converted.length);
+
+        // Verify empty headers marker
+        assertEquals(0x00, converted[0], "First byte for empty header should 
be 0x00");
+
+        // Verify timestamp = -1 (all 0xFF bytes)
+        for (int i = 1; i <= 8; i++) {
+            assertEquals((byte) 0xFF, converted[i], "Timestamp byte " + (i - 
1) + " should be 0xFF for -1");
+        }
+
+        // Verify payload
+        final byte[] actualPayload = Arrays.copyOfRange(converted, 9, 
converted.length);
+        assertArrayEquals(plainValue, actualPayload);
+    }
+
+    @Test
+    public void shouldConvertEmptyPlainValueToHeaderFormat() {
+        final byte[] emptyValue = new byte[0];
+
+        final byte[] converted = 
HeadersBytesStore.convertFromPlainToHeaderFormat(emptyValue);
+
+        assertNotNull(converted);
+        // Expected format: [0x00 (1 byte)][timestamp=-1 (8 bytes)]
+        assertEquals(9, converted.length, "Converted empty value should have 
headers + timestamp");
+
+        final ByteBuffer buffer = ByteBuffer.wrap(converted);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        assertEquals(0, headersSize, "Empty headers should have headersSize = 
0");
+
+        // Verify timestamp = -1
+        final long timestamp = buffer.getLong();
+        assertEquals(-1L, timestamp, "Timestamp should be -1 for plain value 
upgrade");
+    }
+
+    @Test
+    public void shouldConvertPlainValueWithCorrectByteOrder() {

Review Comment:
   Not sure what this test adds, compared to 
`shouldConvertPlainValueToHeaderFormatWithTimestamp` ?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java:
##########
@@ -131,16 +132,198 @@ public void shouldOpenExistingStoreInRegularMode() 
throws Exception {
     }
 
     @Test
-    public void shouldFailToUpgradeDirectlyFromKeyValueStore() {
+    public void shouldMigrateFromPlainToHeadersAwareColumnFamily() throws 
Exception {
+        prepareKeyValueStoreWithMultipleKeys();
+
+        // Open with RocksDBTimestampedStoreWithHeaders - should detect 
DEFAULT CF and enter upgrade mode
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class)) 
{
+            rocksDBStore.init(context, rocksDBStore);
+
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode from plain key value store"));
+        }
+
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7 
entries in DEFAULT CF and 0 in headers-aware CF before migration");
+
+        // get() - tests lazy migration on read
+
+        assertNull(rocksDBStore.get(new Bytes("unknown".getBytes())), 
"Expected null for unknown key");
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7 
entries on DEFAULT CF, 0 in headers-aware CF");
+
+        assertEquals(1 + 0 + 8 + 1, rocksDBStore.get(new 
Bytes("key1".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value(1) = 10 bytes");
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 6 
entries on DEFAULT CF, 1 in headers-aware CF after migrating key1");
+
+        // put() - tests migration on write
+
+        rocksDBStore.put(new Bytes("key2".getBytes()), 
"headers+timestamp+22".getBytes());
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 5 
entries on DEFAULT CF, 2 in headers-aware CF after migrating key2 with put()");
+
+        rocksDBStore.put(new Bytes("key3".getBytes()), null);
+        // count is off by one, due to two delete operations (even if one does 
not delete anything)
+        assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 1 in headers-aware CF after deleting key3 with put()");
+
+        rocksDBStore.put(new Bytes("key8new".getBytes()), 
"headers+timestamp+88888888".getBytes());
+        // one delete on old CF, one put on new CF, but count is off by one 
due to delete on old CF not deleting anything
+        assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 2 in headers-aware CF after adding new key8new with 
put()");
+
+        rocksDBStore.put(new Bytes("key9new".getBytes()), null);
+        // one delete on old CF, one put on new CF, but count is off by two 
due to deletes not deleting anything
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 1 in headers-aware CF after adding new key9new with 
put()");
+
+        // putIfAbsent() - tests migration on conditional write
+
+        assertNull(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()), 
"headers+timestamp+11111111111".getBytes()),
+            "Expected null return value for putIfAbsent on non-existing 
key11new, and new key should be added to headers-aware CF");
+        // one delete on old CF, one put on new CF, but count is off by one 
due to delete on old CF not deleting anything
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 2 in headers-aware CF after adding new key11new with 
putIfAbsent()");
+
+        assertEquals(1 + 0 + 8 + 5, rocksDBStore.putIfAbsent(new 
Bytes("key5".getBytes()), null).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value(5) = 14 bytes for putIfAbsent with null on existing key5");
+        // one delete on old CF, one put on new CF, due to `get()` migration
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3 
entries on DEFAULT CF, 3 in headers-aware CF after migrating key5 with 
putIfAbsent(null)");
+
+        assertNull(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()), 
null));
+        // no delete operation, because key12new is unknown
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3 
entries on DEFAULT CF, 3 in headers-aware CF after putIfAbsent with null on 
non-existing key12new");
+
+        // delete() - tests migration on delete
+
+        assertEquals(1 + 0 + 8 + 6, rocksDBStore.delete(new 
Bytes("key6".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value(6) = 15 bytes for delete() on existing key6");
+        // two delete operation, however, only one is counted because old CF 
count was zero before already
+        assertEquals(2L, rocksDBStore.approximateNumEntries(), "Expected 2 
entries on DEFAULT CF, 2 in headers-aware CF after deleting key6 with 
delete()");
+
+        // iterators should not trigger migration (read-only)
+        iteratorsShouldNotMigrateDataFromPlain();
+        assertEquals(2L, rocksDBStore.approximateNumEntries());
+
+        rocksDBStore.close();
+
+        // Verify the final state of both column families
+        verifyPlainUpgradeColumnFamilies();
+    }
+
+    @Test
+    public void shouldUpgradeFromPlainKeyValueStore() throws Exception {
         // Prepare a plain key-value store (with data in default column family)
         prepareKeyValueStore();
 
-        // Try to open with RocksDBTimestampedStoreWithHeaders - should throw 
exception
-        final ProcessorStateException exception = 
assertThrows(ProcessorStateException.class,
-            () -> rocksDBStore.init(context, rocksDBStore));
+        // Open with RocksDBTimestampedStoreWithHeaders - should detect 
DEFAULT CF and enter upgrade mode
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class)) 
{
+            rocksDBStore.init(context, rocksDBStore);
 
-        assertTrue(exception.getMessage().contains("Cannot upgrade directly 
from key-value store to headers-aware store"));
-        assertTrue(exception.getMessage().contains("Please first upgrade to 
RocksDBTimestampedStore"));
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode from plain key value store"));
+        }
+
+        // Verify we can read the migrated data
+        assertEquals(1 + 0 + 8 + "value1".getBytes().length, 
rocksDBStore.get(new Bytes("key1".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value");
+        assertEquals(1 + 0 + 8 + "value2".getBytes().length, 
rocksDBStore.get(new Bytes("key2".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value");
+
+        rocksDBStore.close();
+
+        // Verify column family structure
+        verifyPlainStoreUpgrade();
+    }
+
+    private void verifyPlainStoreUpgrade() throws Exception {
+        final DBOptions dbOptions = new DBOptions();
+        final ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
+
+        final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+            new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+            new 
ColumnFamilyDescriptor("keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8),
 columnFamilyOptions));
+
+        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+        RocksDB db = null;
+        ColumnFamilyHandle defaultCF = null;
+        ColumnFamilyHandle headersCF = null;
+        try {
+            db = RocksDB.open(
+                dbOptions,
+                new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
+                columnFamilyDescriptors,
+                columnFamilies);
+
+            defaultCF = columnFamilies.get(0);
+            headersCF = columnFamilies.get(1);
+
+            // After get() is called, data is migrated and deleted from 
DEFAULT CF
+            // DEFAULT CF should be empty for migrated keys
+            assertNull(db.get(defaultCF, "key1".getBytes()), "Expected key1 to 
be deleted from DEFAULT CF after migration");
+            assertNull(db.get(defaultCF, "key2".getBytes()), "Expected key2 to 
be deleted from DEFAULT CF after migration");
+
+            // Headers CF should have the migrated data
+            assertEquals(1 + 0 + 8 + "value1".getBytes().length, 
db.get(headersCF, "key1".getBytes()).length);
+            assertEquals(1 + 0 + 8 + "value2".getBytes().length, 
db.get(headersCF, "key2".getBytes()).length);
+        } finally {
+            if (defaultCF != null) {
+                defaultCF.close();
+            }
+            if (headersCF != null) {
+                headersCF.close();
+            }
+            if (db != null) {
+                db.close();
+            }
+            dbOptions.close();
+            columnFamilyOptions.close();
+        }
+    }
+
+    @Test
+    public void shouldFailWhenBothPlainAndTimestampedDataExist() throws 
Exception {
+        // This is an invalid state - we can't have both DEFAULT CF with data 
AND LEGACY_TIMESTAMPED CF
+        // First create a plain store
+        final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+        kvStore.init(context, kvStore);
+        kvStore.put(new Bytes("plainKey".getBytes()), "plainValue".getBytes());
+        kvStore.close();
+
+        // Now manually add timestamped CF with data (simulating corrupted 
state)
+        final DBOptions dbOptions = new DBOptions();
+        final ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
+
+        final List<ColumnFamilyDescriptor> columnFamilyDescriptors = List.of(
+            new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions));
+
+        final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>();
+        RocksDB db = null;
+        ColumnFamilyHandle timestampedCF = null;
+        try {
+            db = RocksDB.open(
+                dbOptions,
+                new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
+                columnFamilyDescriptors,
+                columnFamilies);
+
+            // Create the timestamped CF and add data to it
+            timestampedCF = db.createColumnFamily(
+                new 
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME, 
columnFamilyOptions));
+            db.put(timestampedCF, "timestampedKey".getBytes(), 
wrapTimestampedValue("1".getBytes()));
+        } finally {
+            if (timestampedCF != null) {
+                timestampedCF.close();
+            }
+            for (final ColumnFamilyHandle cf : columnFamilies) {
+                cf.close();
+            }
+            if (db != null) {
+                db.close();
+            }
+            dbOptions.close();
+            columnFamilyOptions.close();
+        }
+
+        // Now try to open with headers store - should fail
+        final ProcessorStateException exception = assertThrows(
+            ProcessorStateException.class,
+            () -> rocksDBStore.init(context, rocksDBStore)

Review Comment:
   I think we are getting the wrong error here? We try to re-open a _plain_ 
store, which also fails, but that's not the error we want to trigger? We want 
to create a `RocksDBTimestampedStoreWithHeaders` and verify that it fails to 
open, right?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -100,55 +132,43 @@ private void openInUpgradeMode(final DBOptions dbOptions,
             new 
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME, 
columnFamilyOptions)
         );
 
-        verifyAndCloseEmptyDefaultColumnFamily(columnFamilies.get(0));
+        // verify and close empty Default ColumnFamily
+        try (final RocksIterator defaultIter = 
db.newIterator(columnFamilies.get(0))) {
+            defaultIter.seekToFirst();
+            if (defaultIter.isValid()) {
+                throw new ProcessorStateException(

Review Comment:
   Do we need to close all three `columnFamilies` handles before we throw? Not 
sure TBH.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -233,6 +234,128 @@ public void 
shouldProxyTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHea
         kafkaStreams.close();
     }
 
+    @Test
+    public void 
shouldMigrateInMemoryPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
 throws Exception {
+        
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(false);
+    }
+
+    @Test
+    public void 
shouldMigratePersistentPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
 throws Exception {
+        
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(true);
+    }
+
+    private void 
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(final
 boolean persistentStore) throws Exception {
+        final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+        streamsBuilderForOldStore.addStateStore(
+                Stores.keyValueStoreBuilder(
+                    persistentStore ? 
Stores.persistentKeyValueStore(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(KeyValueProcessor::new, STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), 
props);
+        kafkaStreams.start();
+
+        processKeyValueAndVerifyValue("key1", "value1");
+        processKeyValueAndVerifyValue("key2", "value2");
+        processKeyValueAndVerifyValue("key3", "value3");
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+        streamsBuilderForNewStore.addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    persistentStore ? 
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), 
props);
+        kafkaStreams.start();
+
+        if (persistentStore) {
+            // Verify legacy data can be read with empty headers and timestamp 
= -1
+            verifyLegacyValuesWithEmptyHeaders("key1", "value1", -1);
+            verifyLegacyValuesWithEmptyHeaders("key2", "value2", -1);
+            verifyLegacyValuesWithEmptyHeaders("key3", "value3", -1);
+        } else {

Review Comment:
   That's ok to accept the record ts (instead of `-1`) -- we do the same thing 
for plain -> ts-format.
   
   The "problem" is, that we use the converter when reading from the changelog, 
and for this case, we just don't know if the set ts on the record is "correct" 
or not, as the changlog format is the same for all three cases.
   
   (And it's also not possible to set `-1` as TS to begin with -- if we set 
`-1` on a `ProducerRecord` the producer would override it...).



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java:
##########
@@ -131,16 +132,198 @@ public void shouldOpenExistingStoreInRegularMode() 
throws Exception {
     }
 
     @Test
-    public void shouldFailToUpgradeDirectlyFromKeyValueStore() {
+    public void shouldMigrateFromPlainToHeadersAwareColumnFamily() throws 
Exception {
+        prepareKeyValueStoreWithMultipleKeys();
+
+        // Open with RocksDBTimestampedStoreWithHeaders - should detect 
DEFAULT CF and enter upgrade mode
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class)) 
{
+            rocksDBStore.init(context, rocksDBStore);
+
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode from plain key value store"));
+        }
+
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7 
entries in DEFAULT CF and 0 in headers-aware CF before migration");
+
+        // get() - tests lazy migration on read
+
+        assertNull(rocksDBStore.get(new Bytes("unknown".getBytes())), 
"Expected null for unknown key");
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7 
entries on DEFAULT CF, 0 in headers-aware CF");
+
+        assertEquals(1 + 0 + 8 + 1, rocksDBStore.get(new 
Bytes("key1".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value(1) = 10 bytes");
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 6 
entries on DEFAULT CF, 1 in headers-aware CF after migrating key1");
+
+        // put() - tests migration on write
+
+        rocksDBStore.put(new Bytes("key2".getBytes()), 
"headers+timestamp+22".getBytes());
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 5 
entries on DEFAULT CF, 2 in headers-aware CF after migrating key2 with put()");
+
+        rocksDBStore.put(new Bytes("key3".getBytes()), null);
+        // count is off by one, due to two delete operations (even if one does 
not delete anything)
+        assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 1 in headers-aware CF after deleting key3 with put()");
+
+        rocksDBStore.put(new Bytes("key8new".getBytes()), 
"headers+timestamp+88888888".getBytes());
+        // one delete on old CF, one put on new CF, but count is off by one 
due to delete on old CF not deleting anything
+        assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 2 in headers-aware CF after adding new key8new with 
put()");
+
+        rocksDBStore.put(new Bytes("key9new".getBytes()), null);
+        // one delete on old CF, one put on new CF, but count is off by two 
due to deletes not deleting anything
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 1 in headers-aware CF after adding new key9new with 
put()");
+
+        // putIfAbsent() - tests migration on conditional write
+
+        assertNull(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()), 
"headers+timestamp+11111111111".getBytes()),
+            "Expected null return value for putIfAbsent on non-existing 
key11new, and new key should be added to headers-aware CF");
+        // one delete on old CF, one put on new CF, but count is off by one 
due to delete on old CF not deleting anything
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 2 in headers-aware CF after adding new key11new with 
putIfAbsent()");
+
+        assertEquals(1 + 0 + 8 + 5, rocksDBStore.putIfAbsent(new 
Bytes("key5".getBytes()), null).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value(5) = 14 bytes for putIfAbsent with null on existing key5");
+        // one delete on old CF, one put on new CF, due to `get()` migration
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3 
entries on DEFAULT CF, 3 in headers-aware CF after migrating key5 with 
putIfAbsent(null)");
+
+        assertNull(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()), 
null));
+        // no delete operation, because key12new is unknown
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3 
entries on DEFAULT CF, 3 in headers-aware CF after putIfAbsent with null on 
non-existing key12new");
+
+        // delete() - tests migration on delete
+
+        assertEquals(1 + 0 + 8 + 6, rocksDBStore.delete(new 
Bytes("key6".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value(6) = 15 bytes for delete() on existing key6");
+        // two delete operation, however, only one is counted because old CF 
count was zero before already
+        assertEquals(2L, rocksDBStore.approximateNumEntries(), "Expected 2 
entries on DEFAULT CF, 2 in headers-aware CF after deleting key6 with 
delete()");
+
+        // iterators should not trigger migration (read-only)
+        iteratorsShouldNotMigrateDataFromPlain();
+        assertEquals(2L, rocksDBStore.approximateNumEntries());
+
+        rocksDBStore.close();
+
+        // Verify the final state of both column families
+        verifyPlainUpgradeColumnFamilies();
+    }
+
+    @Test
+    public void shouldUpgradeFromPlainKeyValueStore() throws Exception {
         // Prepare a plain key-value store (with data in default column family)
         prepareKeyValueStore();
 
-        // Try to open with RocksDBTimestampedStoreWithHeaders - should throw 
exception
-        final ProcessorStateException exception = 
assertThrows(ProcessorStateException.class,
-            () -> rocksDBStore.init(context, rocksDBStore));
+        // Open with RocksDBTimestampedStoreWithHeaders - should detect 
DEFAULT CF and enter upgrade mode
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class)) 
{
+            rocksDBStore.init(context, rocksDBStore);
 
-        assertTrue(exception.getMessage().contains("Cannot upgrade directly 
from key-value store to headers-aware store"));
-        assertTrue(exception.getMessage().contains("Please first upgrade to 
RocksDBTimestampedStore"));
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode from plain key value store"));
+        }
+
+        // Verify we can read the migrated data
+        assertEquals(1 + 0 + 8 + "value1".getBytes().length, 
rocksDBStore.get(new Bytes("key1".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value");
+        assertEquals(1 + 0 + 8 + "value2".getBytes().length, 
rocksDBStore.get(new Bytes("key2".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value");
+
+        rocksDBStore.close();
+
+        // Verify column family structure
+        verifyPlainStoreUpgrade();
+    }
+
+    private void verifyPlainStoreUpgrade() throws Exception {
+        final DBOptions dbOptions = new DBOptions();
+        final ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
+
+        final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+            new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+            new 
ColumnFamilyDescriptor("keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8),
 columnFamilyOptions));
+
+        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+        RocksDB db = null;
+        ColumnFamilyHandle defaultCF = null;
+        ColumnFamilyHandle headersCF = null;
+        try {
+            db = RocksDB.open(
+                dbOptions,
+                new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
+                columnFamilyDescriptors,
+                columnFamilies);
+
+            defaultCF = columnFamilies.get(0);
+            headersCF = columnFamilies.get(1);
+
+            // After get() is called, data is migrated and deleted from 
DEFAULT CF
+            // DEFAULT CF should be empty for migrated keys
+            assertNull(db.get(defaultCF, "key1".getBytes()), "Expected key1 to 
be deleted from DEFAULT CF after migration");
+            assertNull(db.get(defaultCF, "key2".getBytes()), "Expected key2 to 
be deleted from DEFAULT CF after migration");
+
+            // Headers CF should have the migrated data
+            assertEquals(1 + 0 + 8 + "value1".getBytes().length, 
db.get(headersCF, "key1".getBytes()).length);
+            assertEquals(1 + 0 + 8 + "value2".getBytes().length, 
db.get(headersCF, "key2".getBytes()).length);
+        } finally {
+            if (defaultCF != null) {
+                defaultCF.close();
+            }
+            if (headersCF != null) {
+                headersCF.close();
+            }
+            if (db != null) {
+                db.close();
+            }
+            dbOptions.close();
+            columnFamilyOptions.close();
+        }
+    }
+
+    @Test
+    public void shouldFailWhenBothPlainAndTimestampedDataExist() throws 
Exception {
+        // This is an invalid state - we can't have both DEFAULT CF with data 
AND LEGACY_TIMESTAMPED CF
+        // First create a plain store
+        final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+        kvStore.init(context, kvStore);
+        kvStore.put(new Bytes("plainKey".getBytes()), "plainValue".getBytes());
+        kvStore.close();
+
+        // Now manually add timestamped CF with data (simulating corrupted 
state)

Review Comment:
   Could we simplify this code, but putting two keys into the plain store, and 
by creating a ts-store next, and migrate one of both keys, and finally opening 
as header-store?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to