mjsax commented on code in PR #21666:
URL: https://github.com/apache/kafka/pull/21666#discussion_r2907020406
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java:
##########
@@ -131,16 +132,198 @@ public void shouldOpenExistingStoreInRegularMode()
throws Exception {
}
@Test
- public void shouldFailToUpgradeDirectlyFromKeyValueStore() {
+ public void shouldMigrateFromPlainToHeadersAwareColumnFamily() throws
Exception {
+ prepareKeyValueStoreWithMultipleKeys();
+
+ // Open with RocksDBTimestampedStoreWithHeaders - should detect
DEFAULT CF and enter upgrade mode
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode from plain key value store"));
+ }
+
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7
entries in DEFAULT CF and 0 in headers-aware CF before migration");
+
+ // get() - tests lazy migration on read
+
+ assertNull(rocksDBStore.get(new Bytes("unknown".getBytes())),
"Expected null for unknown key");
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7
entries on DEFAULT CF, 0 in headers-aware CF");
+
+ assertEquals(1 + 0 + 8 + 1, rocksDBStore.get(new
Bytes("key1".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(1) = 10 bytes");
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 6
entries on DEFAULT CF, 1 in headers-aware CF after migrating key1");
+
+ // put() - tests migration on write
+
+ rocksDBStore.put(new Bytes("key2".getBytes()),
"headers+timestamp+22".getBytes());
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 5
entries on DEFAULT CF, 2 in headers-aware CF after migrating key2 with put()");
+
+ rocksDBStore.put(new Bytes("key3".getBytes()), null);
+ // count is off by one, due to two delete operations (even if one does
not delete anything)
+ assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 1 in headers-aware CF after deleting key3 with put()");
+
+ rocksDBStore.put(new Bytes("key8new".getBytes()),
"headers+timestamp+88888888".getBytes());
+ // one delete on old CF, one put on new CF, but count is off by one
due to delete on old CF not deleting anything
+ assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 2 in headers-aware CF after adding new key8new with
put()");
+
+ rocksDBStore.put(new Bytes("key9new".getBytes()), null);
+ // one delete on old CF, one put on new CF, but count is off by two
due to deletes not deleting anything
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 1 in headers-aware CF after adding new key9new with
put()");
+
+ // putIfAbsent() - tests migration on conditional write
+
+ assertNull(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()),
"headers+timestamp+11111111111".getBytes()),
+ "Expected null return value for putIfAbsent on non-existing
key11new, and new key should be added to headers-aware CF");
+ // one delete on old CF, one put on new CF, but count is off by one
due to delete on old CF not deleting anything
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 2 in headers-aware CF after adding new key11new with
putIfAbsent()");
+
+ assertEquals(1 + 0 + 8 + 5, rocksDBStore.putIfAbsent(new
Bytes("key5".getBytes()), null).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(5) = 14 bytes for putIfAbsent with null on existing key5");
+ // one delete on old CF, one put on new CF, due to `get()` migration
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3
entries on DEFAULT CF, 3 in headers-aware CF after migrating key5 with
putIfAbsent(null)");
+
+ assertNull(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()),
null));
+ // no delete operation, because key12new is unknown
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3
entries on DEFAULT CF, 3 in headers-aware CF after putIfAbsent with null on
non-existing key12new");
+
+ // delete() - tests migration on delete
+
+ assertEquals(1 + 0 + 8 + 6, rocksDBStore.delete(new
Bytes("key6".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(6) = 15 bytes for delete() on existing key6");
+ // two delete operation, however, only one is counted because old CF
count was zero before already
+ assertEquals(2L, rocksDBStore.approximateNumEntries(), "Expected 2
entries on DEFAULT CF, 2 in headers-aware CF after deleting key6 with
delete()");
+
+ // iterators should not trigger migration (read-only)
+ iteratorsShouldNotMigrateDataFromPlain();
+ assertEquals(2L, rocksDBStore.approximateNumEntries());
+
+ rocksDBStore.close();
+
+ // Verify the final state of both column families
+ verifyPlainUpgradeColumnFamilies();
+ }
+
+ @Test
+ public void shouldUpgradeFromPlainKeyValueStore() throws Exception {
// Prepare a plain key-value store (with data in default column family)
prepareKeyValueStore();
- // Try to open with RocksDBTimestampedStoreWithHeaders - should throw
exception
- final ProcessorStateException exception =
assertThrows(ProcessorStateException.class,
- () -> rocksDBStore.init(context, rocksDBStore));
+ // Open with RocksDBTimestampedStoreWithHeaders - should detect
DEFAULT CF and enter upgrade mode
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
- assertTrue(exception.getMessage().contains("Cannot upgrade directly
from key-value store to headers-aware store"));
- assertTrue(exception.getMessage().contains("Please first upgrade to
RocksDBTimestampedStore"));
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode from plain key value store"));
+ }
+
+ // Verify we can read the migrated data
+ assertEquals(1 + 0 + 8 + "value1".getBytes().length,
rocksDBStore.get(new Bytes("key1".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value");
+ assertEquals(1 + 0 + 8 + "value2".getBytes().length,
rocksDBStore.get(new Bytes("key2".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value");
+
+ rocksDBStore.close();
+
+ // Verify column family structure
+ verifyPlainStoreUpgrade();
+ }
+
+ private void verifyPlainStoreUpgrade() throws Exception {
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new
ColumnFamilyDescriptor("keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8),
columnFamilyOptions));
+
+ final List<ColumnFamilyHandle> columnFamilies = new
ArrayList<>(columnFamilyDescriptors.size());
+ RocksDB db = null;
+ ColumnFamilyHandle defaultCF = null;
+ ColumnFamilyHandle headersCF = null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilies);
+
+ defaultCF = columnFamilies.get(0);
+ headersCF = columnFamilies.get(1);
+
+ // After get() is called, data is migrated and deleted from
DEFAULT CF
+ // DEFAULT CF should be empty for migrated keys
+ assertNull(db.get(defaultCF, "key1".getBytes()), "Expected key1 to
be deleted from DEFAULT CF after migration");
+ assertNull(db.get(defaultCF, "key2".getBytes()), "Expected key2 to
be deleted from DEFAULT CF after migration");
+
+ // Headers CF should have the migrated data
+ assertEquals(1 + 0 + 8 + "value1".getBytes().length,
db.get(headersCF, "key1".getBytes()).length);
+ assertEquals(1 + 0 + 8 + "value2".getBytes().length,
db.get(headersCF, "key2".getBytes()).length);
+ } finally {
+ if (defaultCF != null) {
+ defaultCF.close();
+ }
+ if (headersCF != null) {
+ headersCF.close();
+ }
+ if (db != null) {
+ db.close();
+ }
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+ }
+
+ @Test
+ public void shouldFailWhenBothPlainAndTimestampedDataExist() throws
Exception {
+ // This is an invalid state - we can't have both DEFAULT CF with data
AND LEGACY_TIMESTAMPED CF
+ // First create a plain store
+ final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+ kvStore.init(context, kvStore);
+ kvStore.put(new Bytes("plainKey".getBytes()), "plainValue".getBytes());
+ kvStore.close();
+
+ // Now manually add timestamped CF with data (simulating corrupted
state)
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = List.of(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions));
+
+ final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>();
+ RocksDB db = null;
+ ColumnFamilyHandle timestampedCF = null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilies);
+
+ // Create the timestamped CF and add data to it
+ timestampedCF = db.createColumnFamily(
+ new
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME,
columnFamilyOptions));
+ db.put(timestampedCF, "timestampedKey".getBytes(),
wrapTimestampedValue("1".getBytes()));
+ } finally {
+ if (timestampedCF != null) {
+ timestampedCF.close();
+ }
+ for (final ColumnFamilyHandle cf : columnFamilies) {
+ cf.close();
+ }
+ if (db != null) {
+ db.close();
+ }
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+
+ // Now try to open with headers store - should fail
+ final ProcessorStateException exception = assertThrows(
+ ProcessorStateException.class,
+ () -> rocksDBStore.init(context, rocksDBStore)
Review Comment:
Ah. My bad. Mixed up variable names. All good.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]