mjsax commented on code in PR #21666:
URL: https://github.com/apache/kafka/pull/21666#discussion_r2902521952
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -100,55 +132,43 @@ private void openInUpgradeMode(final DBOptions dbOptions,
new
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME,
columnFamilyOptions)
);
- verifyAndCloseEmptyDefaultColumnFamily(columnFamilies.get(0));
+ // verify and close empty Default ColumnFamily
Review Comment:
Where do we close default-CF on the happy path?
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -233,6 +234,128 @@ public void
shouldProxyTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHea
kafkaStreams.close();
}
+ @Test
+ public void
shouldMigrateInMemoryPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
throws Exception {
+
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(false);
+ }
+
+ @Test
+ public void
shouldMigratePersistentPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
throws Exception {
+
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(true);
+ }
+
+ private void
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(final
boolean persistentStore) throws Exception {
+ final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+ streamsBuilderForOldStore.addStateStore(
+ Stores.keyValueStoreBuilder(
+ persistentStore ?
Stores.persistentKeyValueStore(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(KeyValueProcessor::new, STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(),
props);
+ kafkaStreams.start();
+
+ processKeyValueAndVerifyValue("key1", "value1");
+ processKeyValueAndVerifyValue("key2", "value2");
+ processKeyValueAndVerifyValue("key3", "value3");
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+ streamsBuilderForNewStore.addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ persistentStore ?
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(),
props);
+ kafkaStreams.start();
+
+ if (persistentStore) {
+ // Verify legacy data can be read with empty headers and timestamp
= -1
+ verifyLegacyValuesWithEmptyHeaders("key1", "value1", -1);
+ verifyLegacyValuesWithEmptyHeaders("key2", "value2", -1);
+ verifyLegacyValuesWithEmptyHeaders("key3", "value3", -1);
+ } else {
+ // Verify legacy data can be read with empty headers.
+ // When data is read from the changelog, the timestamp is set to
record.timestamp.
+ verifyLegacyValuesWithEmptyHeaders("key1", "value1");
Review Comment:
We could still verify which ts was used on-write, using
`CLUSTER.time.milliseconds()` -- compare
TimestampedStoreUpgradeIntegrationTest#shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi`
for details how to do this.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -71,27 +71,59 @@ public RocksDBTimestampedStoreWithHeaders(final String name,
@Override
void openRocksDB(final DBOptions dbOptions,
final ColumnFamilyOptions columnFamilyOptions) {
- // Check if we're upgrading from RocksDBTimestampedStore (which uses
keyValueWithTimestamp CF)
+ // Check if we're upgrading from RocksDBTimestampedStore or from plain
RocksDBStore
final List<byte[]> existingCFs;
try (final Options options = new Options(dbOptions, new
ColumnFamilyOptions())) {
existingCFs = RocksDB.listColumnFamilies(options,
dbDir.getAbsolutePath());
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error listing column families
for store " + name, e);
}
-
- final boolean upgradingFromTimestampedStore = existingCFs.stream()
+ final boolean hasTimestampedCF = existingCFs.stream()
.anyMatch(cf -> Arrays.equals(cf, LEGACY_TIMESTAMPED_CF_NAME));
- if (upgradingFromTimestampedStore) {
- openInUpgradeMode(dbOptions, columnFamilyOptions);
+ if (hasTimestampedCF) {
+ // Upgrading from timestamped store - use 2 CFs:
LEGACY_TIMESTAMPED + HEADERS
+ openFromTimestampedStore(dbOptions, columnFamilyOptions); // here
check that default has no data
Review Comment:
```suggestion
openFromTimestampedStore(dbOptions, columnFamilyOptions); //
needs to check that default-CF has no data
```
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -71,27 +71,59 @@ public RocksDBTimestampedStoreWithHeaders(final String name,
@Override
void openRocksDB(final DBOptions dbOptions,
final ColumnFamilyOptions columnFamilyOptions) {
- // Check if we're upgrading from RocksDBTimestampedStore (which uses
keyValueWithTimestamp CF)
+ // Check if we're upgrading from RocksDBTimestampedStore or from plain
RocksDBStore
final List<byte[]> existingCFs;
try (final Options options = new Options(dbOptions, new
ColumnFamilyOptions())) {
existingCFs = RocksDB.listColumnFamilies(options,
dbDir.getAbsolutePath());
} catch (final RocksDBException e) {
throw new ProcessorStateException("Error listing column families
for store " + name, e);
}
-
- final boolean upgradingFromTimestampedStore = existingCFs.stream()
+ final boolean hasTimestampedCF = existingCFs.stream()
.anyMatch(cf -> Arrays.equals(cf, LEGACY_TIMESTAMPED_CF_NAME));
- if (upgradingFromTimestampedStore) {
- openInUpgradeMode(dbOptions, columnFamilyOptions);
+ if (hasTimestampedCF) {
+ // Upgrading from timestamped store - use 2 CFs:
LEGACY_TIMESTAMPED + HEADERS
+ openFromTimestampedStore(dbOptions, columnFamilyOptions); // here
check that default has no data
} else {
- openInRegularMode(dbOptions, columnFamilyOptions);
+ openFromDefaultStore(dbOptions, columnFamilyOptions);
+ }
+
+ }
+
+ private void openFromDefaultStore(final DBOptions dbOptions,
+ final ColumnFamilyOptions
columnFamilyOptions) {
Review Comment:
nit: indention
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/PlainToHeadersStoreAdapter.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.RangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.kafka.streams.state.HeadersBytesStore.convertFromPlainToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link TimestampedKeyValueStoreWithHeaders} and plain {@link KeyValueStore}.
+ * <p>
+ * If a user provides a supplier for plain {@code KeyValueStore} (without
timestamp or headers) via
+ * {@link Materialized#as(KeyValueBytesStoreSupplier)} when building
+ * a {@code TimestampedKeyValueStoreWithHeaders}, this adapter is used to
translate between
+ * the plain {@code byte[]} format and the timestamped-with-headers {@code
byte[]} format.
+ *
+ * @see PlainToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class PlainToHeadersStoreAdapter implements KeyValueStore<Bytes,
byte[]> {
+ final KeyValueStore<Bytes, byte[]> store;
+
+ PlainToHeadersStoreAdapter(final KeyValueStore<Bytes, byte[]> store) {
+ if (!store.persistent()) {
+ throw new IllegalArgumentException("Provided store must be a
persistent store, but it is not.");
+ }
+ if (store instanceof TimestampedBytesStore) {
+ throw new IllegalArgumentException("Provided store must be a plain
(non-timestamped) key value store, but it is timestamped.");
+ }
+ this.store = store;
+ }
+
+ /**
+ * Extract raw plain value from serialized ValueTimestampHeaders.
+ * This strips both the headers and timestamp portions.
+ *
+ * Format conversion:
+ * Input: [headersSize(varint)][headers][timestamp(8)][value]
+ * Output: [value]
+ */
+ static byte[] rawPlainValue(final byte[] rawValueTimestampHeaders) {
Review Comment:
Should we follow https://github.com/apache/kafka/pull/21626 and add
`UtilsTest` class in this PR -- to avoid piling up more stuff we need to
refactor?
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java:
##########
@@ -559,6 +852,60 @@ private void verifyInHeadersAwareMode() {
}
}
+ @Test
+ public void shouldTransitionFromPlainUpgradeModeToRegularMode() throws
Exception {
+ // Prepare plain store
+ prepareKeyValueStore();
+
+ // Open in upgrade mode
+ rocksDBStore.init(context, rocksDBStore);
+
+ // Migrate all data by reading it
+ rocksDBStore.get(new Bytes("key1".getBytes()));
+ rocksDBStore.get(new Bytes("key2".getBytes()));
+
+ rocksDBStore.close();
+
+ // Clear the default CF manually
Review Comment:
Why do we need this? The two `get()` from above should have migrated both
rows already?
##########
streams/src/test/java/org/apache/kafka/streams/state/HeadersBytesStoreTest.java:
##########
@@ -64,4 +64,69 @@ public void shouldConvertEmptyValueToHeaderFormat() {
assertEquals(0, headersSize, "Empty headers should have headersSize =
0");
assertEquals(0, buffer.remaining(), "No payload bytes for empty
value");
}
+
+ @Test
+ public void shouldReturnNullWhenConvertingNullPlainValue() {
+ final byte[] result =
HeadersBytesStore.convertFromPlainToHeaderFormat(null);
+ assertNull(result);
+ }
+
+ @Test
+ public void shouldConvertPlainValueToHeaderFormatWithTimestamp() {
+ final byte[] plainValue = "test-value".getBytes();
+
+ final byte[] converted =
HeadersBytesStore.convertFromPlainToHeaderFormat(plainValue);
+
+ assertNotNull(converted);
+ // Expected format: [0x00 (1 byte)][timestamp=-1 (8 bytes)][value]
+ assertEquals(1 + 8 + plainValue.length, converted.length);
+
+ // Verify empty headers marker
+ assertEquals(0x00, converted[0], "First byte for empty header should
be 0x00");
+
+ // Verify timestamp = -1 (all 0xFF bytes)
+ for (int i = 1; i <= 8; i++) {
+ assertEquals((byte) 0xFF, converted[i], "Timestamp byte " + (i -
1) + " should be 0xFF for -1");
+ }
+
+ // Verify payload
+ final byte[] actualPayload = Arrays.copyOfRange(converted, 9,
converted.length);
+ assertArrayEquals(plainValue, actualPayload);
+ }
+
+ @Test
+ public void shouldConvertEmptyPlainValueToHeaderFormat() {
+ final byte[] emptyValue = new byte[0];
+
+ final byte[] converted =
HeadersBytesStore.convertFromPlainToHeaderFormat(emptyValue);
+
+ assertNotNull(converted);
+ // Expected format: [0x00 (1 byte)][timestamp=-1 (8 bytes)]
+ assertEquals(9, converted.length, "Converted empty value should have
headers + timestamp");
+
+ final ByteBuffer buffer = ByteBuffer.wrap(converted);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ assertEquals(0, headersSize, "Empty headers should have headersSize =
0");
+
+ // Verify timestamp = -1
+ final long timestamp = buffer.getLong();
+ assertEquals(-1L, timestamp, "Timestamp should be -1 for plain value
upgrade");
+ }
+
+ @Test
+ public void shouldConvertPlainValueWithCorrectByteOrder() {
Review Comment:
Not sure what this test adds, compared to
`shouldConvertPlainValueToHeaderFormatWithTimestamp` ?
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java:
##########
@@ -131,16 +132,198 @@ public void shouldOpenExistingStoreInRegularMode()
throws Exception {
}
@Test
- public void shouldFailToUpgradeDirectlyFromKeyValueStore() {
+ public void shouldMigrateFromPlainToHeadersAwareColumnFamily() throws
Exception {
+ prepareKeyValueStoreWithMultipleKeys();
+
+ // Open with RocksDBTimestampedStoreWithHeaders - should detect
DEFAULT CF and enter upgrade mode
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode from plain key value store"));
+ }
+
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7
entries in DEFAULT CF and 0 in headers-aware CF before migration");
+
+ // get() - tests lazy migration on read
+
+ assertNull(rocksDBStore.get(new Bytes("unknown".getBytes())),
"Expected null for unknown key");
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7
entries on DEFAULT CF, 0 in headers-aware CF");
+
+ assertEquals(1 + 0 + 8 + 1, rocksDBStore.get(new
Bytes("key1".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(1) = 10 bytes");
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 6
entries on DEFAULT CF, 1 in headers-aware CF after migrating key1");
+
+ // put() - tests migration on write
+
+ rocksDBStore.put(new Bytes("key2".getBytes()),
"headers+timestamp+22".getBytes());
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 5
entries on DEFAULT CF, 2 in headers-aware CF after migrating key2 with put()");
+
+ rocksDBStore.put(new Bytes("key3".getBytes()), null);
+ // count is off by one, due to two delete operations (even if one does
not delete anything)
+ assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 1 in headers-aware CF after deleting key3 with put()");
+
+ rocksDBStore.put(new Bytes("key8new".getBytes()),
"headers+timestamp+88888888".getBytes());
+ // one delete on old CF, one put on new CF, but count is off by one
due to delete on old CF not deleting anything
+ assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 2 in headers-aware CF after adding new key8new with
put()");
+
+ rocksDBStore.put(new Bytes("key9new".getBytes()), null);
+ // one delete on old CF, one put on new CF, but count is off by two
due to deletes not deleting anything
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 1 in headers-aware CF after adding new key9new with
put()");
+
+ // putIfAbsent() - tests migration on conditional write
+
+ assertNull(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()),
"headers+timestamp+11111111111".getBytes()),
+ "Expected null return value for putIfAbsent on non-existing
key11new, and new key should be added to headers-aware CF");
+ // one delete on old CF, one put on new CF, but count is off by one
due to delete on old CF not deleting anything
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 2 in headers-aware CF after adding new key11new with
putIfAbsent()");
+
+ assertEquals(1 + 0 + 8 + 5, rocksDBStore.putIfAbsent(new
Bytes("key5".getBytes()), null).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(5) = 14 bytes for putIfAbsent with null on existing key5");
+ // one delete on old CF, one put on new CF, due to `get()` migration
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3
entries on DEFAULT CF, 3 in headers-aware CF after migrating key5 with
putIfAbsent(null)");
+
+ assertNull(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()),
null));
+ // no delete operation, because key12new is unknown
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3
entries on DEFAULT CF, 3 in headers-aware CF after putIfAbsent with null on
non-existing key12new");
+
+ // delete() - tests migration on delete
+
+ assertEquals(1 + 0 + 8 + 6, rocksDBStore.delete(new
Bytes("key6".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(6) = 15 bytes for delete() on existing key6");
+ // two delete operation, however, only one is counted because old CF
count was zero before already
+ assertEquals(2L, rocksDBStore.approximateNumEntries(), "Expected 2
entries on DEFAULT CF, 2 in headers-aware CF after deleting key6 with
delete()");
+
+ // iterators should not trigger migration (read-only)
+ iteratorsShouldNotMigrateDataFromPlain();
+ assertEquals(2L, rocksDBStore.approximateNumEntries());
+
+ rocksDBStore.close();
+
+ // Verify the final state of both column families
+ verifyPlainUpgradeColumnFamilies();
+ }
+
+ @Test
+ public void shouldUpgradeFromPlainKeyValueStore() throws Exception {
// Prepare a plain key-value store (with data in default column family)
prepareKeyValueStore();
- // Try to open with RocksDBTimestampedStoreWithHeaders - should throw
exception
- final ProcessorStateException exception =
assertThrows(ProcessorStateException.class,
- () -> rocksDBStore.init(context, rocksDBStore));
+ // Open with RocksDBTimestampedStoreWithHeaders - should detect
DEFAULT CF and enter upgrade mode
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
- assertTrue(exception.getMessage().contains("Cannot upgrade directly
from key-value store to headers-aware store"));
- assertTrue(exception.getMessage().contains("Please first upgrade to
RocksDBTimestampedStore"));
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode from plain key value store"));
+ }
+
+ // Verify we can read the migrated data
+ assertEquals(1 + 0 + 8 + "value1".getBytes().length,
rocksDBStore.get(new Bytes("key1".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value");
+ assertEquals(1 + 0 + 8 + "value2".getBytes().length,
rocksDBStore.get(new Bytes("key2".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value");
+
+ rocksDBStore.close();
+
+ // Verify column family structure
+ verifyPlainStoreUpgrade();
+ }
+
+ private void verifyPlainStoreUpgrade() throws Exception {
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new
ColumnFamilyDescriptor("keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8),
columnFamilyOptions));
+
+ final List<ColumnFamilyHandle> columnFamilies = new
ArrayList<>(columnFamilyDescriptors.size());
+ RocksDB db = null;
+ ColumnFamilyHandle defaultCF = null;
+ ColumnFamilyHandle headersCF = null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilies);
+
+ defaultCF = columnFamilies.get(0);
+ headersCF = columnFamilies.get(1);
+
+ // After get() is called, data is migrated and deleted from
DEFAULT CF
+ // DEFAULT CF should be empty for migrated keys
+ assertNull(db.get(defaultCF, "key1".getBytes()), "Expected key1 to
be deleted from DEFAULT CF after migration");
+ assertNull(db.get(defaultCF, "key2".getBytes()), "Expected key2 to
be deleted from DEFAULT CF after migration");
+
+ // Headers CF should have the migrated data
+ assertEquals(1 + 0 + 8 + "value1".getBytes().length,
db.get(headersCF, "key1".getBytes()).length);
+ assertEquals(1 + 0 + 8 + "value2".getBytes().length,
db.get(headersCF, "key2".getBytes()).length);
+ } finally {
+ if (defaultCF != null) {
+ defaultCF.close();
+ }
+ if (headersCF != null) {
+ headersCF.close();
+ }
+ if (db != null) {
+ db.close();
+ }
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+ }
+
+ @Test
+ public void shouldFailWhenBothPlainAndTimestampedDataExist() throws
Exception {
+ // This is an invalid state - we can't have both DEFAULT CF with data
AND LEGACY_TIMESTAMPED CF
+ // First create a plain store
+ final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+ kvStore.init(context, kvStore);
+ kvStore.put(new Bytes("plainKey".getBytes()), "plainValue".getBytes());
+ kvStore.close();
+
+ // Now manually add timestamped CF with data (simulating corrupted
state)
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = List.of(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions));
+
+ final List<ColumnFamilyHandle> columnFamilies = new ArrayList<>();
+ RocksDB db = null;
+ ColumnFamilyHandle timestampedCF = null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilies);
+
+ // Create the timestamped CF and add data to it
+ timestampedCF = db.createColumnFamily(
+ new
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME,
columnFamilyOptions));
+ db.put(timestampedCF, "timestampedKey".getBytes(),
wrapTimestampedValue("1".getBytes()));
+ } finally {
+ if (timestampedCF != null) {
+ timestampedCF.close();
+ }
+ for (final ColumnFamilyHandle cf : columnFamilies) {
+ cf.close();
+ }
+ if (db != null) {
+ db.close();
+ }
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+
+ // Now try to open with headers store - should fail
+ final ProcessorStateException exception = assertThrows(
+ ProcessorStateException.class,
+ () -> rocksDBStore.init(context, rocksDBStore)
Review Comment:
I think we are getting the wrong error here? We try to re-open a _plain_
store, which also fails, but that's not the error we want to trigger? We want
to create a `RocksDBTimestampedStoreWithHeaders` and verify that it fails to
open, right?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -100,55 +132,43 @@ private void openInUpgradeMode(final DBOptions dbOptions,
new
ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME,
columnFamilyOptions)
);
- verifyAndCloseEmptyDefaultColumnFamily(columnFamilies.get(0));
+ // verify and close empty Default ColumnFamily
+ try (final RocksIterator defaultIter =
db.newIterator(columnFamilies.get(0))) {
+ defaultIter.seekToFirst();
+ if (defaultIter.isValid()) {
+ throw new ProcessorStateException(
Review Comment:
Do we need to close all three `columnFamilies` handles before we throw? Not
sure TBH.
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -233,6 +234,128 @@ public void
shouldProxyTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHea
kafkaStreams.close();
}
+ @Test
+ public void
shouldMigrateInMemoryPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
throws Exception {
+
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(false);
+ }
+
+ @Test
+ public void
shouldMigratePersistentPlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
throws Exception {
+
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(true);
+ }
+
+ private void
shouldMigratePlainKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(final
boolean persistentStore) throws Exception {
+ final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+ streamsBuilderForOldStore.addStateStore(
+ Stores.keyValueStoreBuilder(
+ persistentStore ?
Stores.persistentKeyValueStore(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(KeyValueProcessor::new, STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(),
props);
+ kafkaStreams.start();
+
+ processKeyValueAndVerifyValue("key1", "value1");
+ processKeyValueAndVerifyValue("key2", "value2");
+ processKeyValueAndVerifyValue("key3", "value3");
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+ streamsBuilderForNewStore.addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ persistentStore ?
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(),
props);
+ kafkaStreams.start();
+
+ if (persistentStore) {
+ // Verify legacy data can be read with empty headers and timestamp
= -1
+ verifyLegacyValuesWithEmptyHeaders("key1", "value1", -1);
+ verifyLegacyValuesWithEmptyHeaders("key2", "value2", -1);
+ verifyLegacyValuesWithEmptyHeaders("key3", "value3", -1);
+ } else {
Review Comment:
That's ok to accept the record ts (instead of `-1`) -- we do the same thing
for plain -> ts-format.
The "problem" is, that we use the converter when reading from the changelog,
and for this case, we just don't know if the set ts on the record is "correct"
or not, as the changlog format is the same for all three cases.
(And it's also not possible to set `-1` as TS to begin with -- if we set
`-1` on a `ProducerRecord` the producer would override it...).
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java:
##########
@@ -131,16 +132,198 @@ public void shouldOpenExistingStoreInRegularMode()
throws Exception {
}
@Test
- public void shouldFailToUpgradeDirectlyFromKeyValueStore() {
+ public void shouldMigrateFromPlainToHeadersAwareColumnFamily() throws
Exception {
+ prepareKeyValueStoreWithMultipleKeys();
+
+ // Open with RocksDBTimestampedStoreWithHeaders - should detect
DEFAULT CF and enter upgrade mode
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode from plain key value store"));
+ }
+
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7
entries in DEFAULT CF and 0 in headers-aware CF before migration");
+
+ // get() - tests lazy migration on read
+
+ assertNull(rocksDBStore.get(new Bytes("unknown".getBytes())),
"Expected null for unknown key");
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 7
entries on DEFAULT CF, 0 in headers-aware CF");
+
+ assertEquals(1 + 0 + 8 + 1, rocksDBStore.get(new
Bytes("key1".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(1) = 10 bytes");
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 6
entries on DEFAULT CF, 1 in headers-aware CF after migrating key1");
+
+ // put() - tests migration on write
+
+ rocksDBStore.put(new Bytes("key2".getBytes()),
"headers+timestamp+22".getBytes());
+ assertEquals(7L, rocksDBStore.approximateNumEntries(), "Expected 5
entries on DEFAULT CF, 2 in headers-aware CF after migrating key2 with put()");
+
+ rocksDBStore.put(new Bytes("key3".getBytes()), null);
+ // count is off by one, due to two delete operations (even if one does
not delete anything)
+ assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 1 in headers-aware CF after deleting key3 with put()");
+
+ rocksDBStore.put(new Bytes("key8new".getBytes()),
"headers+timestamp+88888888".getBytes());
+ // one delete on old CF, one put on new CF, but count is off by one
due to delete on old CF not deleting anything
+ assertEquals(5L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 2 in headers-aware CF after adding new key8new with
put()");
+
+ rocksDBStore.put(new Bytes("key9new".getBytes()), null);
+ // one delete on old CF, one put on new CF, but count is off by two
due to deletes not deleting anything
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 1 in headers-aware CF after adding new key9new with
put()");
+
+ // putIfAbsent() - tests migration on conditional write
+
+ assertNull(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()),
"headers+timestamp+11111111111".getBytes()),
+ "Expected null return value for putIfAbsent on non-existing
key11new, and new key should be added to headers-aware CF");
+ // one delete on old CF, one put on new CF, but count is off by one
due to delete on old CF not deleting anything
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 4
entries on DEFAULT CF, 2 in headers-aware CF after adding new key11new with
putIfAbsent()");
+
+ assertEquals(1 + 0 + 8 + 5, rocksDBStore.putIfAbsent(new
Bytes("key5".getBytes()), null).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(5) = 14 bytes for putIfAbsent with null on existing key5");
+ // one delete on old CF, one put on new CF, due to `get()` migration
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3
entries on DEFAULT CF, 3 in headers-aware CF after migrating key5 with
putIfAbsent(null)");
+
+ assertNull(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()),
null));
+ // no delete operation, because key12new is unknown
+ assertEquals(3L, rocksDBStore.approximateNumEntries(), "Expected 3
entries on DEFAULT CF, 3 in headers-aware CF after putIfAbsent with null on
non-existing key12new");
+
+ // delete() - tests migration on delete
+
+ assertEquals(1 + 0 + 8 + 6, rocksDBStore.delete(new
Bytes("key6".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value(6) = 15 bytes for delete() on existing key6");
+ // two delete operation, however, only one is counted because old CF
count was zero before already
+ assertEquals(2L, rocksDBStore.approximateNumEntries(), "Expected 2
entries on DEFAULT CF, 2 in headers-aware CF after deleting key6 with
delete()");
+
+ // iterators should not trigger migration (read-only)
+ iteratorsShouldNotMigrateDataFromPlain();
+ assertEquals(2L, rocksDBStore.approximateNumEntries());
+
+ rocksDBStore.close();
+
+ // Verify the final state of both column families
+ verifyPlainUpgradeColumnFamilies();
+ }
+
+ @Test
+ public void shouldUpgradeFromPlainKeyValueStore() throws Exception {
// Prepare a plain key-value store (with data in default column family)
prepareKeyValueStore();
- // Try to open with RocksDBTimestampedStoreWithHeaders - should throw
exception
- final ProcessorStateException exception =
assertThrows(ProcessorStateException.class,
- () -> rocksDBStore.init(context, rocksDBStore));
+ // Open with RocksDBTimestampedStoreWithHeaders - should detect
DEFAULT CF and enter upgrade mode
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
- assertTrue(exception.getMessage().contains("Cannot upgrade directly
from key-value store to headers-aware store"));
- assertTrue(exception.getMessage().contains("Please first upgrade to
RocksDBTimestampedStore"));
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode from plain key value store"));
+ }
+
+ // Verify we can read the migrated data
+ assertEquals(1 + 0 + 8 + "value1".getBytes().length,
rocksDBStore.get(new Bytes("key1".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value");
+ assertEquals(1 + 0 + 8 + "value2".getBytes().length,
rocksDBStore.get(new Bytes("key2".getBytes())).length,
+ "Expected header-aware format: varint(1) + empty headers(0) +
timestamp(8) + value");
+
+ rocksDBStore.close();
+
+ // Verify column family structure
+ verifyPlainStoreUpgrade();
+ }
+
+ private void verifyPlainStoreUpgrade() throws Exception {
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new
ColumnFamilyDescriptor("keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8),
columnFamilyOptions));
+
+ final List<ColumnFamilyHandle> columnFamilies = new
ArrayList<>(columnFamilyDescriptors.size());
+ RocksDB db = null;
+ ColumnFamilyHandle defaultCF = null;
+ ColumnFamilyHandle headersCF = null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilies);
+
+ defaultCF = columnFamilies.get(0);
+ headersCF = columnFamilies.get(1);
+
+ // After get() is called, data is migrated and deleted from
DEFAULT CF
+ // DEFAULT CF should be empty for migrated keys
+ assertNull(db.get(defaultCF, "key1".getBytes()), "Expected key1 to
be deleted from DEFAULT CF after migration");
+ assertNull(db.get(defaultCF, "key2".getBytes()), "Expected key2 to
be deleted from DEFAULT CF after migration");
+
+ // Headers CF should have the migrated data
+ assertEquals(1 + 0 + 8 + "value1".getBytes().length,
db.get(headersCF, "key1".getBytes()).length);
+ assertEquals(1 + 0 + 8 + "value2".getBytes().length,
db.get(headersCF, "key2".getBytes()).length);
+ } finally {
+ if (defaultCF != null) {
+ defaultCF.close();
+ }
+ if (headersCF != null) {
+ headersCF.close();
+ }
+ if (db != null) {
+ db.close();
+ }
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+ }
+
+ @Test
+ public void shouldFailWhenBothPlainAndTimestampedDataExist() throws
Exception {
+ // This is an invalid state - we can't have both DEFAULT CF with data
AND LEGACY_TIMESTAMPED CF
+ // First create a plain store
+ final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+ kvStore.init(context, kvStore);
+ kvStore.put(new Bytes("plainKey".getBytes()), "plainValue".getBytes());
+ kvStore.close();
+
+ // Now manually add timestamped CF with data (simulating corrupted
state)
Review Comment:
Could we simplify this code, but putting two keys into the plain store, and
by creating a ts-store next, and migrate one of both keys, and finally opening
as header-store?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]