frankvicky commented on code in PR #21446:
URL: https://github.com/apache/kafka/pull/21446#discussion_r2790449297


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * A persistent key-(value-timestamp-headers) store based on RocksDB.
+ */
+public class RocksDBTimestampedStoreWithHeaders extends RocksDBStore 
implements HeadersBytesStore {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocksDBTimestampedStoreWithHeaders.class);
+
+    // Legacy column family name - must match 
RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME
+    private static final byte[] LEGACY_TIMESTAMPED_CF_NAME =
+        "keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8);
+
+    // New column family for header-aware timestamped values.
+    private static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
+        "keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8);
+
+    public RocksDBTimestampedStoreWithHeaders(final String name,
+                                              final String metricsScope) {
+        super(name, metricsScope);
+    }
+
+    RocksDBTimestampedStoreWithHeaders(final String name,
+                                       final String parentDir,
+                                       final RocksDBMetricsRecorder 
metricsRecorder) {
+        super(name, parentDir, metricsRecorder);
+    }
+
+    @Override
+    void openRocksDB(final DBOptions dbOptions,
+                     final ColumnFamilyOptions columnFamilyOptions) {
+        // Check if we're upgrading from RocksDBTimestampedStore (which uses 
keyValueWithTimestamp CF)
+        final List<byte[]> existingCFs;
+        try {
+            final org.rocksdb.Options options = new 
org.rocksdb.Options(dbOptions, new ColumnFamilyOptions());

Review Comment:
   Could we replace full qualified name with import?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java:
##########
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import org.hamcrest.core.IsNull;
+import org.junit.jupiter.api.Test;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class RocksDBTimestampedStoreWithHeadersTest extends RocksDBStoreTest {
+
+    private final Serializer<String> stringSerializer = new StringSerializer();
+
+    RocksDBStore getRocksDBStore() {
+        return new RocksDBTimestampedStoreWithHeaders(DB_NAME, METRICS_SCOPE);
+    }
+
+    @Test
+    public void shouldOpenNewStoreInRegularMode() {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBTimestampedStoreWithHeaders.class)) 
{
+            rocksDBStore.init(context, rocksDBStore);
+
+            assertThat(appender.getMessages(), hasItem("Opening store " + 
DB_NAME + " in regular headers-aware mode"));

Review Comment:
   I'm wondering should we keep using hamcrest in test.
   AFAIK, hamcrest is not active, maybe we should use JUnit instead?
   cc. @mjsax @bbejeck 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * A persistent key-(value-timestamp-headers) store based on RocksDB.
+ */
+public class RocksDBTimestampedStoreWithHeaders extends RocksDBStore 
implements HeadersBytesStore {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocksDBTimestampedStoreWithHeaders.class);
+
+    // Legacy column family name - must match 
RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME
+    private static final byte[] LEGACY_TIMESTAMPED_CF_NAME =
+        "keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8);
+
+    // New column family for header-aware timestamped values.
+    private static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
+        "keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8);
+
+    public RocksDBTimestampedStoreWithHeaders(final String name,
+                                              final String metricsScope) {
+        super(name, metricsScope);
+    }
+
+    RocksDBTimestampedStoreWithHeaders(final String name,
+                                       final String parentDir,
+                                       final RocksDBMetricsRecorder 
metricsRecorder) {
+        super(name, parentDir, metricsRecorder);
+    }
+
+    @Override
+    void openRocksDB(final DBOptions dbOptions,
+                     final ColumnFamilyOptions columnFamilyOptions) {
+        // Check if we're upgrading from RocksDBTimestampedStore (which uses 
keyValueWithTimestamp CF)
+        final List<byte[]> existingCFs;
+        try {
+            final org.rocksdb.Options options = new 
org.rocksdb.Options(dbOptions, new ColumnFamilyOptions());
+            existingCFs = RocksDB.listColumnFamilies(options, 
dbDir.getAbsolutePath());
+            options.close();
+        } catch (final org.rocksdb.RocksDBException e) {
+            throw new 
org.apache.kafka.streams.errors.ProcessorStateException("Error listing column 
families for store " + name, e);
+        }
+
+        final boolean upgradingFromTimestampedStore = existingCFs.stream()
+            .anyMatch(cf -> java.util.Arrays.equals(cf, 
LEGACY_TIMESTAMPED_CF_NAME));

Review Comment:
   ditto



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * A persistent key-(value-timestamp-headers) store based on RocksDB.
+ */
+public class RocksDBTimestampedStoreWithHeaders extends RocksDBStore 
implements HeadersBytesStore {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocksDBTimestampedStoreWithHeaders.class);
+
+    // Legacy column family name - must match 
RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME
+    private static final byte[] LEGACY_TIMESTAMPED_CF_NAME =
+        "keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8);
+
+    // New column family for header-aware timestamped values.
+    private static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
+        "keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8);
+
+    public RocksDBTimestampedStoreWithHeaders(final String name,
+                                              final String metricsScope) {
+        super(name, metricsScope);
+    }
+
+    RocksDBTimestampedStoreWithHeaders(final String name,
+                                       final String parentDir,
+                                       final RocksDBMetricsRecorder 
metricsRecorder) {
+        super(name, parentDir, metricsRecorder);
+    }
+
+    @Override
+    void openRocksDB(final DBOptions dbOptions,
+                     final ColumnFamilyOptions columnFamilyOptions) {
+        // Check if we're upgrading from RocksDBTimestampedStore (which uses 
keyValueWithTimestamp CF)
+        final List<byte[]> existingCFs;
+        try {
+            final org.rocksdb.Options options = new 
org.rocksdb.Options(dbOptions, new ColumnFamilyOptions());
+            existingCFs = RocksDB.listColumnFamilies(options, 
dbDir.getAbsolutePath());
+            options.close();
+        } catch (final org.rocksdb.RocksDBException e) {
+            throw new 
org.apache.kafka.streams.errors.ProcessorStateException("Error listing column 
families for store " + name, e);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to