mjsax commented on code in PR #21666:
URL: https://github.com/apache/kafka/pull/21666#discussion_r2907020406


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java:
##########
@@ -131,16 +132,198 @@ public void shouldOpenExistingStoreInRegularMode() 
throws Exception {
     }
 
     @Test
-    public void shouldFailToUpgradeDirectlyFromKeyValueStore() {
+    public void shouldMigrateFromPlainToHeadersAwareColumnFamily() throws 
Exception {
+        prepareKeyValueStoreWithMultipleKeys();
+
+        // Open with RocksDBTimestampedStoreWithHeaders - should detect 
DEFAULT CF and enter upgrade mode
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class)) 
{
+            rocksDBStore.init(context, rocksDBStore);
+
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode from plain key value store"));
+        }
+
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7 
entries in DEFAULT CF and 0 in headers-aware CF before migration");
+
+        // get() - tests lazy migration on read
+
+        assertNull(rocksDBStore.get(new Bytes("unknown".getBytes())), 
"Expected null for unknown key");
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7 
entries on DEFAULT CF, 0 in headers-aware CF");
+
+        assertEquals(1 + 0 + 8 + 1, rocksDBStore.get(new 
Bytes("key1".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value(1) = 10 bytes");
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 6 
entries on DEFAULT CF, 1 in headers-aware CF after migrating key1");
+
+        // put() - tests migration on write
+
+        rocksDBStore.put(new Bytes("key2".getBytes()), 
"headers+timestamp+22".getBytes());
+        assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 5 
entries on DEFAULT CF, 2 in headers-aware CF after migrating key2 with put()");
+
+        rocksDBStore.put(new Bytes("key3".getBytes()), null);
+        // count is off by one, due to two delete operations (even if one does 
not delete anything)
+        assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 1 in headers-aware CF after deleting key3 with put()");
+
+        rocksDBStore.put(new Bytes("key8new".getBytes()), 
"headers+timestamp+88888888".getBytes());
+        // one delete on old CF, one put on new CF, but count is off by one 
due to delete on old CF not deleting anything
+        assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 2 in headers-aware CF after adding new key8new with 
put()");
+
+        rocksDBStore.put(new Bytes("key9new".getBytes()), null);
+        // one delete on old CF, one put on new CF, but count is off by two 
due to deletes not deleting anything
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 1 in headers-aware CF after adding new key9new with 
put()");
+
+        // putIfAbsent() - tests migration on conditional write
+
+        assertNull(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()), 
"headers+timestamp+11111111111".getBytes()),
+            "Expected null return value for putIfAbsent on non-existing 
key11new, and new key should be added to headers-aware CF");
+        // one delete on old CF, one put on new CF, but count is off by one 
due to delete on old CF not deleting anything
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4 
entries on DEFAULT CF, 2 in headers-aware CF after adding new key11new with 
putIfAbsent()");
+
+        assertEquals(1 + 0 + 8 + 5, rocksDBStore.putIfAbsent(new 
Bytes("key5".getBytes()), null).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value(5) = 14 bytes for putIfAbsent with null on existing key5");
+        // one delete on old CF, one put on new CF, due to `get()` migration
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3 
entries on DEFAULT CF, 3 in headers-aware CF after migrating key5 with 
putIfAbsent(null)");
+
+        assertNull(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()), 
null));
+        // no delete operation, because key12new is unknown
+        assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3 
entries on DEFAULT CF, 3 in headers-aware CF after putIfAbsent with null on 
non-existing key12new");
+
+        // delete() - tests migration on delete
+
+        assertEquals(1 + 0 + 8 + 6, rocksDBStore.delete(new 
Bytes("key6".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value(6) = 15 bytes for delete() on existing key6");
+        // two delete operation, however, only one is counted because old CF 
count was zero before already
+        assertEquals(2L, rocksDBStore.approximateNumEntries(), "Expected 2 
entries on DEFAULT CF, 2 in headers-aware CF after deleting key6 with 
delete()");
+
+        // iterators should not trigger migration (read-only)
+        iteratorsShouldNotMigrateDataFromPlain();
+        assertEquals(2L, rocksDBStore.approximateNumEntries());
+
+        rocksDBStore.close();
+
+        // Verify the final state of both column families
+        verifyPlainUpgradeColumnFamilies();
+    }
+
+    @Test
+    public void shouldUpgradeFromPlainKeyValueStore() throws Exception {
         // Prepare a plain key-value store (with data in default column family)
         prepareKeyValueStore();
 
-        // Try to open with RocksDBTimestampedStoreWithHeaders - should throw 
exception
-        final ProcessorStateException exception = 
assertThrows(ProcessorStateException.class,
-            () -> rocksDBStore.init(context, rocksDBStore));
+        // Open with RocksDBTimestampedStoreWithHeaders - should detect 
DEFAULT CF and enter upgrade mode
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class)) 
{
+            rocksDBStore.init(context, rocksDBStore);
 
-        assertTrue(exception.getMessage().contains("Cannot upgrade directly 
from key-value store to headers-aware store"));
-        assertTrue(exception.getMessage().contains("Please first upgrade to 
RocksDBTimestampedStore"));
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode from plain key value store"));
+        }
+
+        // Verify we can read the migrated data
+        assertEquals(1 + 0 + 8 + "value1".getBytes().length, 
rocksDBStore.get(new Bytes("key1".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value");
+        assertEquals(1 + 0 + 8 + "value2".getBytes().length, 
rocksDBStore.get(new Bytes("key2".getBytes())).length,
+            "Expected header-aware format: varint(1) + empty headers(0) + 
timestamp(8) + value");
+
+        rocksDBStore.close();
+
+        // Verify column family structure
+        verifyPlainStoreUpgrade();
+    }
+
+    private void verifyPlainStoreUpgrade() throws Exception {
+        final DBOptions dbOptions = new DBOptions();
+        final ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
+
+        final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+            new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+            new 
ColumnFamilyDescriptor("keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8),
 columnFamilyOptions));
+
+        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+        RocksDB db = null;
+        ColumnFamilyHandle defaultCF = null;
+        ColumnFamilyHandle headersCF = null;
+        try {
+            db = RocksDB.open(
+                dbOptions,
+                new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
+                columnFamilyDescriptors,
+                columnFamilies);
+
+            defaultCF = columnFamilies.get(0);
+            headersCF = columnFamilies.get(1);
+
+            // After get() is called, data is migrated and deleted from 
DEFAULT CF
+            // DEFAULT CF should be empty for migrated keys
+            assertNull(db.get(defaultCF, "key1".getBytes()), "Expected key1 to 
be deleted from DEFAULT CF after migration");
+            assertNull(db.get(defaultCF, "key2".getBytes()), "Expected key2 to 
be deleted from DEFAULT CF after migration");
+
+            // Headers CF should have the migrated data
+            assertEquals(1 + 0 + 8 + "value1".getBytes().length, 
db.get(headersCF, "key1".getBytes()).length);
+            assertEquals(1 + 0 + 8 + "value2".getBytes().length, 
db.get(headersCF, "key2".getBytes()).length);
+        } finally {
+            if (defaultCF != null) {
+                defaultCF.close();
+            }
+            if (headersCF != null) {
+                headersCF.close();
+            }
+            if (db != null) {
+                db.close();
+            }
+            dbOptions.close();
+            columnFamilyOptions.close();
+        }
+    }
+
+    @Test
+    public void shouldFailWhenBothPlainAndTimestampedDataExist() throws 
Exception {
+        // This is an invalid state - we can't have both DEFAULT CF with data 
AND LEGACY_TIMESTAMPED CF
+        // First create a plain store
+        final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+        kvStore.init(context, kvStore);
+        kvStore.put(new Bytes("plainKey".getBytes()), "plainValue".getBytes());
+        kvStore.close();
+
+        // Now manually add timestamped CF with data (simulating corrupted 
state)
+        final DBOptions dbOptions = new DBOptions();
+        final ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
+
+        final List<ColumnFamilyDescriptor> columnFamilyDescriptors = List.of(
+            new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions));
+
+        final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>();
+        RocksDB db = null;
+        ColumnFamilyHandle timestampedCF = null;
+        try {
+            db = RocksDB.open(
+                dbOptions,
+                new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
+                columnFamilyDescriptors,
+                columnFamilies);
+
+            // Create the timestamped CF and add data to it
+            timestampedCF = db.createColumnFamily(
+                new 
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME, 
columnFamilyOptions));
+            db.put(timestampedCF, "timestampedKey".getBytes(), 
wrapTimestampedValue("1".getBytes()));
+        } finally {
+            if (timestampedCF != null) {
+                timestampedCF.close();
+            }
+            for (final ColumnFamilyHandle cf : columnFamilies) {
+                cf.close();
+            }
+            if (db != null) {
+                db.close();
+            }
+            dbOptions.close();
+            columnFamilyOptions.close();
+        }
+
+        // Now try to open with headers store - should fail
+        final ProcessorStateException exception = assertThrows(
+            ProcessorStateException.class,
+            () -> rocksDBStore.init(context, rocksDBStore)

Review Comment:
   Ah. My bad. Mixed up variable names. All good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to