sashapolo commented on code in PR #769:
URL: https://github.com/apache/ignite-3/pull/769#discussion_r850539554
##########
modules/core/src/main/java/org/apache/ignite/lang/IgniteLogger.java:
##########
@@ -36,6 +36,16 @@ public static IgniteLogger forClass(Class<?> cls) {
return new IgniteLogger(cls);
}
+ /**
+ * Creates logger for name.
+ *
+ * @param name The name for a logger.
+ * @return Ignite logger.
+ */
+ public static IgniteLogger forName(String name) {
Review Comment:
why is this change needed?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogStorageProvider.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage;
+
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+
+/** Log storage provider interface. */
+public interface LogStorageProvider {
+ /**
+ * Starts log storage provider.
+ */
+ void start();
+
+ /**
+ * Stops log storage provider.
+ */
+ void shutdown();
Review Comment:
I would prefer extending `AutoCloseable` instead
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogStorageProvider.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage;
+
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+
+/** Log storage provider interface. */
+public interface LogStorageProvider {
+ /**
+ * Starts log storage provider.
+ */
+ void start();
+
+ /**
+ * Stops log storage provider.
Review Comment:
```suggestion
* Stops the log storage provider.
```
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/DefaultLogStorageProvider.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.LogStorageProvider;
+import org.apache.ignite.raft.jraft.util.Platform;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Env;
+import org.rocksdb.Priority;
+import org.rocksdb.RocksDB;
+import org.rocksdb.util.SizeUnit;
+
+public class DefaultLogStorageProvider implements LogStorageProvider {
+ /** Database path. */
+ private final Path path;
+
+ /** Database instance shared across log storages. */
+ private RocksDB db;
+
+ /** Database options. */
+ private DBOptions dbOptions;
+
+ /** Configuration column family handle. */
+ private ColumnFamilyHandle confHandle;
+
+ /** Data column family handle. */
+ private ColumnFamilyHandle dataHandle;
+
+ public DefaultLogStorageProvider(Path path) {
+ this.path = path;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ if (Files.notExists(path)) {
+ try {
+ Files.createDirectories(path);
+ }
+ catch (IOException e) {
+ throw new IllegalStateException("Failed to create file: " +
this.path, e);
+ }
+ }
+
+ if (Files.exists(path) && !Files.isDirectory(path)) {
Review Comment:
I think all this code and the code above can be replaced by
`Files.createDirectories(path)`, since `createDirectories` already performs all
these checks
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/DefaultLogStorageProvider.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.LogStorageProvider;
+import org.apache.ignite.raft.jraft.util.Platform;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Env;
+import org.rocksdb.Priority;
+import org.rocksdb.RocksDB;
+import org.rocksdb.util.SizeUnit;
+
+public class DefaultLogStorageProvider implements LogStorageProvider {
+ /** Database path. */
+ private final Path path;
+
+ /** Database instance shared across log storages. */
+ private RocksDB db;
+
+ /** Database options. */
+ private DBOptions dbOptions;
+
+ /** Configuration column family handle. */
+ private ColumnFamilyHandle confHandle;
+
+ /** Data column family handle. */
+ private ColumnFamilyHandle dataHandle;
+
+ public DefaultLogStorageProvider(Path path) {
+ this.path = path;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ if (Files.notExists(path)) {
+ try {
+ Files.createDirectories(path);
+ }
+ catch (IOException e) {
+ throw new IllegalStateException("Failed to create file: " +
this.path, e);
+ }
+ }
+
+ if (Files.exists(path) && !Files.isDirectory(path)) {
+ throw new IllegalStateException("Invalid log path, it's a regular
file: " + this.path);
+ }
+
+ final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+
+ this.dbOptions = createDBOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new
ArrayList<>();
+ final ColumnFamilyOptions cfOption = createColumnFamilyOptions();
+
+ // Column family to store configuration log entry.
+ columnFamilyDescriptors.add(new
ColumnFamilyDescriptor("Configuration".getBytes(UTF_8), cfOption));
+ // Default column family to store user data log entry.
+ columnFamilyDescriptors.add(new
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY, cfOption));
+
+ try {
+ this.db = RocksDB.open(this.dbOptions, this.path.toString(),
columnFamilyDescriptors, columnFamilyHandles);
+
+ // Setup rocks thread pools
+ Env env = db.getEnv();
+
env.setBackgroundThreads(Runtime.getRuntime().availableProcessors(),
Priority.HIGH);
+
env.setBackgroundThreads(Runtime.getRuntime().availableProcessors(),
Priority.LOW);
+
+ assert (columnFamilyHandles.size() == 2);
+ this.confHandle = columnFamilyHandles.get(0);
+ this.dataHandle = columnFamilyHandles.get(1);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void shutdown() {
+ confHandle.close();
+ dataHandle.close();
+ db.close();
+ dbOptions.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LogStorage getLogStorage(String groupId, RaftOptions raftOptions) {
+ return new RocksDBSharedLogStorage(db, confHandle, dataHandle,
groupId, raftOptions);
+ }
+
+ /**
+ * @return Default database options.
+ */
+ private static DBOptions createDBOptions() {
+ DBOptions options = new DBOptions();
Review Comment:
can be inlined
##########
modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBSharedLogStorageAdvancedTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import java.nio.file.Path;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
+import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests for implementation specifics of the shared storage.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDBSharedLogStorageAdvancedTest {
+ @WorkDirectory
+ private Path path;
+
+ private DefaultLogStorageProvider logStorageProvider;
+
+ private ConfigurationManager confManager;
+
+ private LogEntryCodecFactory logEntryCodecFactory;
+
+ @BeforeEach
+ public void setUp() {
+ logStorageProvider = new DefaultLogStorageProvider(this.path);
+
+ logStorageProvider.start();
+
+ this.confManager = new ConfigurationManager();
+ this.logEntryCodecFactory = LogEntryV1CodecFactory.getInstance();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ logStorageProvider.shutdown();
+ }
+
+ @Test
+ public void raftGroupsWithPrefixCollision() {
+ LogStorage abcdStorage = logStorageProvider.getLogStorage("abcd", new
RaftOptions());
+ abcdStorage.init(newLogStorageOptions());
+
+ LogStorage abStorage = logStorageProvider.getLogStorage("ab", new
RaftOptions());
+ abStorage.init(newLogStorageOptions());
+
+ int count = 100;
+
+ for (int i = 0; i < count; i++) {
+ LogEntry abcdEntry = TestUtils.mockEntry(i, i, 1);
+ abcdStorage.appendEntry(abcdEntry);
+
+ LogEntry ab = TestUtils.mockEntry(i, i + 1000, 2);
+ abStorage.appendEntry(ab);
+ }
+
+ assertEquals(0, abcdStorage.getFirstLogIndex());
+ assertEquals(count - 1, abcdStorage.getLastLogIndex());
+
+ assertEquals(0, abStorage.getFirstLogIndex());
+ assertEquals(count - 1, abStorage.getLastLogIndex());
+
+ for (int i = 0; i < count; i++) {
+ LogEntry abcdEntry = abcdStorage.getEntry(i);
+
+ assertEquals(new LogId(i, i), abcdEntry.getId());
+
+ LogEntry abEntry = abStorage.getEntry(i);
+
+ assertEquals(new LogId(i, i + 1000), abEntry.getId());
+ }
+
+ abStorage.reset(1);
+
+ assertEquals(0, abcdStorage.getFirstLogIndex());
+ assertEquals(count - 1, abcdStorage.getLastLogIndex());
+
+ for (int i = 0; i < count; i++) {
+ LogEntry entry = abcdStorage.getEntry(i);
+
+ assertEquals(new LogId(i, i), entry.getId());
+ }
+
+ abcdStorage.shutdown();
+ abStorage.shutdown();
+ }
+
+ @Test
+ public void testIncorrectRaftGroupName() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> logStorageProvider.getLogStorage("name" + ((char) 0),
new RaftOptions())
+ );
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> logStorageProvider.getLogStorage("name" + ((char) 1),
new RaftOptions())
+ );
+ }
+
+ private LogStorageOptions newLogStorageOptions() {
Review Comment:
Shouldn't this be a field?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogStorageProvider.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage;
+
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+
+/** Log storage provider interface. */
+public interface LogStorageProvider {
Review Comment:
I think `LogStorageFactory` might be a better name
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java:
##########
@@ -161,7 +157,7 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
/**
* Custom service factory.
*/
- private JRaftServiceFactory serviceFactory = new
DefaultJRaftServiceFactory();
Review Comment:
why is this change needed?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java:
##########
@@ -35,9 +36,17 @@
* The default factory for JRaft services.
*/
public class DefaultJRaftServiceFactory implements JRaftServiceFactory {
- @Override public LogStorage createLogStorage(final String uri, final
RaftOptions raftOptions) {
- Requires.requireTrue(StringUtils.isNotBlank(uri), "Blank log storage
uri.");
- return new RocksDBLogStorage(uri, raftOptions);
+
+ private final LogStorageProvider logStorageProvider;
+
+ public DefaultJRaftServiceFactory(LogStorageProvider wrapper) {
Review Comment:
`wrapper`?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogStorageProvider.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage;
+
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+
+/** Log storage provider interface. */
+public interface LogStorageProvider {
+ /**
+ * Starts log storage provider.
+ */
+ void start();
+
+ /**
+ * Stops log storage provider.
+ */
+ void shutdown();
+
+ /**
+ * Creates log storage.
Review Comment:
```suggestion
* Creates a log storage.
```
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogStorageProvider.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage;
+
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+
+/** Log storage provider interface. */
+public interface LogStorageProvider {
+ /**
+ * Starts log storage provider.
Review Comment:
```suggestion
* Starts the log storage provider.
```
##########
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java:
##########
@@ -56,6 +57,9 @@
* Cluster service factory that uses ScaleCube for messaging and topology
services.
*/
public class ScaleCubeClusterServiceFactory {
+ /** Logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forName("Cluster");
Review Comment:
what's wrong with using the class name here?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/DefaultLogStorageProvider.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.LogStorageProvider;
+import org.apache.ignite.raft.jraft.util.Platform;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Env;
+import org.rocksdb.Priority;
+import org.rocksdb.RocksDB;
+import org.rocksdb.util.SizeUnit;
+
+public class DefaultLogStorageProvider implements LogStorageProvider {
+ /** Database path. */
+ private final Path path;
+
+ /** Database instance shared across log storages. */
+ private RocksDB db;
+
+ /** Database options. */
+ private DBOptions dbOptions;
+
+ /** Configuration column family handle. */
+ private ColumnFamilyHandle confHandle;
+
+ /** Data column family handle. */
+ private ColumnFamilyHandle dataHandle;
+
+ public DefaultLogStorageProvider(Path path) {
+ this.path = path;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ if (Files.notExists(path)) {
+ try {
+ Files.createDirectories(path);
+ }
+ catch (IOException e) {
+ throw new IllegalStateException("Failed to create file: " +
this.path, e);
+ }
+ }
+
+ if (Files.exists(path) && !Files.isDirectory(path)) {
+ throw new IllegalStateException("Invalid log path, it's a regular
file: " + this.path);
+ }
+
+ final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+
+ this.dbOptions = createDBOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new
ArrayList<>();
+ final ColumnFamilyOptions cfOption = createColumnFamilyOptions();
+
+ // Column family to store configuration log entry.
+ columnFamilyDescriptors.add(new
ColumnFamilyDescriptor("Configuration".getBytes(UTF_8), cfOption));
+ // Default column family to store user data log entry.
+ columnFamilyDescriptors.add(new
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY, cfOption));
+
+ try {
+ this.db = RocksDB.open(this.dbOptions, this.path.toString(),
columnFamilyDescriptors, columnFamilyHandles);
+
+ // Setup rocks thread pools
+ Env env = db.getEnv();
+
env.setBackgroundThreads(Runtime.getRuntime().availableProcessors(),
Priority.HIGH);
Review Comment:
why is this needed?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogStorageProvider.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage;
Review Comment:
```suggestion
package org.apache.ignite.raft.jraft.storage;
```
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -577,7 +577,7 @@ private void doStopNode(@NotNull List<IgniteComponent>
startedComponents) {
try {
componentToStop.stop();
- } catch (Exception e) {
+ } catch (Throwable e) {
Review Comment:
why is this change needed?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBSharedLogStorage.java:
##########
@@ -0,0 +1,737 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import static java.util.Arrays.copyOfRange;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.util.BytesUtil;
+import org.apache.ignite.raft.jraft.util.Describer;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.Utils;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Log storage that shares rocksdb instance with other log storages.
+ * Stores key with groupId prefix to distinguish them from keys that belongs
to other storages.
+ */
+public class RocksDBSharedLogStorage implements LogStorage, Describer {
+ /** Logger. */
+ private static final IgniteLogger LOG =
IgniteLogger.forClass(RocksDBSharedLogStorage.class);
+
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ /**
+ * An empty write context
+ */
+ private static class EmptyWriteContext implements WriteContext {
+ static EmptyWriteContext INSTANCE = new EmptyWriteContext();
+ }
+
+ /**
+ * VarHandle that gives the access to the elements of a {@code byte[]}
array viewed as if it was a {@code long[]}
+ * array.
+ */
+ private static final VarHandle LONG_ARRAY_HANDLE =
MethodHandles.byteArrayViewVarHandle(
Review Comment:
Why do you need this VarHandle? The original code does not use it
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/DefaultLogStorageProvider.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.LogStorageProvider;
+import org.apache.ignite.raft.jraft.util.Platform;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Env;
+import org.rocksdb.Priority;
+import org.rocksdb.RocksDB;
+import org.rocksdb.util.SizeUnit;
+
+public class DefaultLogStorageProvider implements LogStorageProvider {
+ /** Database path. */
+ private final Path path;
+
+ /** Database instance shared across log storages. */
+ private RocksDB db;
+
+ /** Database options. */
+ private DBOptions dbOptions;
+
+ /** Configuration column family handle. */
+ private ColumnFamilyHandle confHandle;
+
+ /** Data column family handle. */
+ private ColumnFamilyHandle dataHandle;
+
+ public DefaultLogStorageProvider(Path path) {
+ this.path = path;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ if (Files.notExists(path)) {
+ try {
+ Files.createDirectories(path);
+ }
+ catch (IOException e) {
+ throw new IllegalStateException("Failed to create file: " +
this.path, e);
+ }
+ }
+
+ if (Files.exists(path) && !Files.isDirectory(path)) {
+ throw new IllegalStateException("Invalid log path, it's a regular
file: " + this.path);
+ }
+
+ final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+
+ this.dbOptions = createDBOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new
ArrayList<>();
+ final ColumnFamilyOptions cfOption = createColumnFamilyOptions();
+
+ // Column family to store configuration log entry.
+ columnFamilyDescriptors.add(new
ColumnFamilyDescriptor("Configuration".getBytes(UTF_8), cfOption));
+ // Default column family to store user data log entry.
+ columnFamilyDescriptors.add(new
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY, cfOption));
+
+ try {
+ this.db = RocksDB.open(this.dbOptions, this.path.toString(),
columnFamilyDescriptors, columnFamilyHandles);
+
+ // Setup rocks thread pools
+ Env env = db.getEnv();
+
env.setBackgroundThreads(Runtime.getRuntime().availableProcessors(),
Priority.HIGH);
+
env.setBackgroundThreads(Runtime.getRuntime().availableProcessors(),
Priority.LOW);
+
+ assert (columnFamilyHandles.size() == 2);
+ this.confHandle = columnFamilyHandles.get(0);
+ this.dataHandle = columnFamilyHandles.get(1);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void shutdown() {
+ confHandle.close();
Review Comment:
you should you `IgniteUtils.closeAll`
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBSharedLogStorage.java:
##########
@@ -0,0 +1,737 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import static java.util.Arrays.copyOfRange;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.util.BytesUtil;
+import org.apache.ignite.raft.jraft.util.Describer;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.Utils;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Log storage that shares rocksdb instance with other log storages.
+ * Stores key with groupId prefix to distinguish them from keys that belongs
to other storages.
+ */
+public class RocksDBSharedLogStorage implements LogStorage, Describer {
+ /** Logger. */
+ private static final IgniteLogger LOG =
IgniteLogger.forClass(RocksDBSharedLogStorage.class);
+
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ /**
+ * An empty write context
+ */
+ private static class EmptyWriteContext implements WriteContext {
+ static EmptyWriteContext INSTANCE = new EmptyWriteContext();
+ }
+
+ /**
+ * VarHandle that gives the access to the elements of a {@code byte[]}
array viewed as if it was a {@code long[]}
+ * array.
+ */
+ private static final VarHandle LONG_ARRAY_HANDLE =
MethodHandles.byteArrayViewVarHandle(
+ long[].class,
+ ByteOrder.BIG_ENDIAN
+ );
+
+ /**
+ * First log index and last log index key in configuration column family.
+ */
+ private static final byte[] FIRST_LOG_IDX_KEY =
Utils.getBytes("meta/firstLogIndex");
+
+ /** Shared db instance. */
+ private final RocksDB db;
+
+ /** Shared configuration column family handle. */
+ private final ColumnFamilyHandle confHandle;
+
+ /** Shared data column family handle. */
+ private final ColumnFamilyHandle dataHandle;
+
+ /** Write options. */
+ private final WriteOptions writeOptions;
+
+ /** Start prefix. */
+ private final byte[] groupStartPrefix;
+
+ /** End prefix. */
+ private final byte[] groupEndPrefix;
+
+ /** Iteration lower bound. */
+ private final Slice lowerBound;
Review Comment:
I would rename it to `groupStartBound` and `groupEndBound` respsectively
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/DefaultLogStorageProvider.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage.impl;
Review Comment:
Should we fix the codestyle in this class even though it's not checked
automatically?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBSharedLogStorage.java:
##########
@@ -0,0 +1,737 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import static java.util.Arrays.copyOfRange;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.util.BytesUtil;
+import org.apache.ignite.raft.jraft.util.Describer;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.Utils;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Log storage that shares rocksdb instance with other log storages.
+ * Stores key with groupId prefix to distinguish them from keys that belongs
to other storages.
+ */
+public class RocksDBSharedLogStorage implements LogStorage, Describer {
+ /** Logger. */
+ private static final IgniteLogger LOG =
IgniteLogger.forClass(RocksDBSharedLogStorage.class);
+
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ /**
+ * An empty write context
+ */
+ private static class EmptyWriteContext implements WriteContext {
+ static EmptyWriteContext INSTANCE = new EmptyWriteContext();
+ }
+
+ /**
+ * VarHandle that gives the access to the elements of a {@code byte[]}
array viewed as if it was a {@code long[]}
+ * array.
+ */
+ private static final VarHandle LONG_ARRAY_HANDLE =
MethodHandles.byteArrayViewVarHandle(
+ long[].class,
+ ByteOrder.BIG_ENDIAN
+ );
+
+ /**
+ * First log index and last log index key in configuration column family.
+ */
+ private static final byte[] FIRST_LOG_IDX_KEY =
Utils.getBytes("meta/firstLogIndex");
+
+ /** Shared db instance. */
+ private final RocksDB db;
+
+ /** Shared configuration column family handle. */
+ private final ColumnFamilyHandle confHandle;
+
+ /** Shared data column family handle. */
+ private final ColumnFamilyHandle dataHandle;
+
+ /** Write options. */
+ private final WriteOptions writeOptions;
+
+ /** Start prefix. */
+ private final byte[] groupStartPrefix;
+
+ /** End prefix. */
+ private final byte[] groupEndPrefix;
+
+ /** Iteration lower bound. */
+ private final Slice lowerBound;
+
+ /** Iteration upper bound. */
+ private final Slice upperBound;
+
+ /** RW lock. */
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+ /** Read lock. */
+ private final Lock readLock = this.readWriteLock.readLock();
+
+ /** Write lock. */
+ private final Lock writeLock = this.readWriteLock.writeLock();
+
+ /** Executor that handles prefix truncation. */
+ private final ExecutorService executor =
Executors.newSingleThreadExecutor();
+
+ /** Log entry encoder. */
+ private LogEntryEncoder logEntryEncoder;
+
+ /** Log entry decoder. */
+ private LogEntryDecoder logEntryDecoder;
+
+ /** First log index. */
+ private volatile long firstLogIndex = 1;
+
+ /** First log index loaded flag. */
+ private volatile boolean hasLoadFirstLogIndex;
+
+ public RocksDBSharedLogStorage(
+ RocksDB db,
+ ColumnFamilyHandle confHandle,
+ ColumnFamilyHandle dataHandle,
+ String groupId,
+ RaftOptions raftOptions
+ ) {
+ Requires.requireTrue(
+ !groupId.contains(Character.toString((char) 0)),
Review Comment:
should we use `indexOf` check instead?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/DefaultLogStorageProvider.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.LogStorageProvider;
+import org.apache.ignite.raft.jraft.util.Platform;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Env;
+import org.rocksdb.Priority;
+import org.rocksdb.RocksDB;
+import org.rocksdb.util.SizeUnit;
+
+public class DefaultLogStorageProvider implements LogStorageProvider {
+ /** Database path. */
+ private final Path path;
+
+ /** Database instance shared across log storages. */
+ private RocksDB db;
+
+ /** Database options. */
+ private DBOptions dbOptions;
+
+ /** Configuration column family handle. */
+ private ColumnFamilyHandle confHandle;
+
+ /** Data column family handle. */
+ private ColumnFamilyHandle dataHandle;
+
+ public DefaultLogStorageProvider(Path path) {
+ this.path = path;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ if (Files.notExists(path)) {
+ try {
+ Files.createDirectories(path);
+ }
+ catch (IOException e) {
+ throw new IllegalStateException("Failed to create file: " +
this.path, e);
+ }
+ }
+
+ if (Files.exists(path) && !Files.isDirectory(path)) {
+ throw new IllegalStateException("Invalid log path, it's a regular
file: " + this.path);
+ }
+
+ final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+
+ this.dbOptions = createDBOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = new
ArrayList<>();
Review Comment:
I guess you can simply use `List.of`
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBSharedLogStorage.java:
##########
@@ -0,0 +1,737 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import static java.util.Arrays.copyOfRange;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.util.BytesUtil;
+import org.apache.ignite.raft.jraft.util.Describer;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.Utils;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Log storage that shares rocksdb instance with other log storages.
+ * Stores key with groupId prefix to distinguish them from keys that belongs
to other storages.
+ */
+public class RocksDBSharedLogStorage implements LogStorage, Describer {
+ /** Logger. */
+ private static final IgniteLogger LOG =
IgniteLogger.forClass(RocksDBSharedLogStorage.class);
+
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ /**
+ * An empty write context
+ */
+ private static class EmptyWriteContext implements WriteContext {
+ static EmptyWriteContext INSTANCE = new EmptyWriteContext();
+ }
+
+ /**
+ * VarHandle that gives the access to the elements of a {@code byte[]}
array viewed as if it was a {@code long[]}
+ * array.
+ */
+ private static final VarHandle LONG_ARRAY_HANDLE =
MethodHandles.byteArrayViewVarHandle(
+ long[].class,
+ ByteOrder.BIG_ENDIAN
+ );
+
+ /**
+ * First log index and last log index key in configuration column family.
+ */
+ private static final byte[] FIRST_LOG_IDX_KEY =
Utils.getBytes("meta/firstLogIndex");
+
+ /** Shared db instance. */
+ private final RocksDB db;
+
+ /** Shared configuration column family handle. */
+ private final ColumnFamilyHandle confHandle;
+
+ /** Shared data column family handle. */
+ private final ColumnFamilyHandle dataHandle;
+
+ /** Write options. */
+ private final WriteOptions writeOptions;
+
+ /** Start prefix. */
+ private final byte[] groupStartPrefix;
+
+ /** End prefix. */
+ private final byte[] groupEndPrefix;
+
+ /** Iteration lower bound. */
+ private final Slice lowerBound;
+
+ /** Iteration upper bound. */
+ private final Slice upperBound;
+
+ /** RW lock. */
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+ /** Read lock. */
+ private final Lock readLock = this.readWriteLock.readLock();
+
+ /** Write lock. */
+ private final Lock writeLock = this.readWriteLock.writeLock();
+
+ /** Executor that handles prefix truncation. */
+ private final ExecutorService executor =
Executors.newSingleThreadExecutor();
+
+ /** Log entry encoder. */
+ private LogEntryEncoder logEntryEncoder;
+
+ /** Log entry decoder. */
+ private LogEntryDecoder logEntryDecoder;
+
+ /** First log index. */
+ private volatile long firstLogIndex = 1;
+
+ /** First log index loaded flag. */
+ private volatile boolean hasLoadFirstLogIndex;
+
+ public RocksDBSharedLogStorage(
+ RocksDB db,
+ ColumnFamilyHandle confHandle,
+ ColumnFamilyHandle dataHandle,
+ String groupId,
+ RaftOptions raftOptions
+ ) {
+ Requires.requireTrue(
+ !groupId.contains(Character.toString((char) 0)),
+ "Raft group id " + groupId + " must not contain char(0)"
+ );
+ Requires.requireTrue(
+ !groupId.contains(Character.toString((char) 1)),
+ "Raft group id " + groupId + " must not contain char(1)"
+ );
+
+ this.db = db;
+ this.confHandle = confHandle;
+ this.dataHandle = dataHandle;
+ this.groupStartPrefix = (groupId + (char)
0).getBytes(StandardCharsets.UTF_8);
+ this.groupEndPrefix = (groupId + (char)
1).getBytes(StandardCharsets.UTF_8);
+ this.lowerBound = new Slice(groupStartPrefix);
+ this.upperBound = new Slice(groupEndPrefix);
+
+ this.writeOptions = new WriteOptions();
+ this.writeOptions.setSync(raftOptions.isSync());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean init(LogStorageOptions opts) {
+ Requires.requireNonNull(opts.getConfigurationManager(), "Null conf
manager");
+ Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log
entry codec factory");
+ this.writeLock.lock();
+ try {
+ this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder();
+ this.logEntryEncoder = opts.getLogEntryCodecFactory().encoder();
+ Requires.requireNonNull(this.logEntryDecoder, "Null log entry
decoder");
+ Requires.requireNonNull(this.logEntryEncoder, "Null log entry
encoder");
+
+ return initAndLoad(opts.getConfigurationManager());
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private boolean initAndLoad(ConfigurationManager configurationManager) {
+ this.hasLoadFirstLogIndex = false;
+ this.firstLogIndex = 1;
+ load(configurationManager);
+ return onInitLoaded();
+ }
+
+ private void load(ConfigurationManager confManager) {
+ try (
+ var readOptions = new
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
Review Comment:
what's `setTotalOrderSeek` for?
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBSharedLogStorage.java:
##########
@@ -0,0 +1,737 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import static java.util.Arrays.copyOfRange;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.util.BytesUtil;
+import org.apache.ignite.raft.jraft.util.Describer;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.Utils;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Log storage that shares rocksdb instance with other log storages.
+ * Stores key with groupId prefix to distinguish them from keys that belongs
to other storages.
+ */
+public class RocksDBSharedLogStorage implements LogStorage, Describer {
+ /** Logger. */
+ private static final IgniteLogger LOG =
IgniteLogger.forClass(RocksDBSharedLogStorage.class);
+
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ /**
+ * An empty write context
+ */
+ private static class EmptyWriteContext implements WriteContext {
+ static EmptyWriteContext INSTANCE = new EmptyWriteContext();
+ }
+
+ /**
+ * VarHandle that gives the access to the elements of a {@code byte[]}
array viewed as if it was a {@code long[]}
+ * array.
+ */
+ private static final VarHandle LONG_ARRAY_HANDLE =
MethodHandles.byteArrayViewVarHandle(
+ long[].class,
+ ByteOrder.BIG_ENDIAN
+ );
+
+ /**
+ * First log index and last log index key in configuration column family.
+ */
+ private static final byte[] FIRST_LOG_IDX_KEY =
Utils.getBytes("meta/firstLogIndex");
+
+ /** Shared db instance. */
+ private final RocksDB db;
+
+ /** Shared configuration column family handle. */
+ private final ColumnFamilyHandle confHandle;
+
+ /** Shared data column family handle. */
+ private final ColumnFamilyHandle dataHandle;
+
+ /** Write options. */
+ private final WriteOptions writeOptions;
+
+ /** Start prefix. */
+ private final byte[] groupStartPrefix;
+
+ /** End prefix. */
+ private final byte[] groupEndPrefix;
+
+ /** Iteration lower bound. */
+ private final Slice lowerBound;
+
+ /** Iteration upper bound. */
+ private final Slice upperBound;
+
+ /** RW lock. */
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+ /** Read lock. */
+ private final Lock readLock = this.readWriteLock.readLock();
+
+ /** Write lock. */
+ private final Lock writeLock = this.readWriteLock.writeLock();
+
+ /** Executor that handles prefix truncation. */
+ private final ExecutorService executor =
Executors.newSingleThreadExecutor();
+
+ /** Log entry encoder. */
+ private LogEntryEncoder logEntryEncoder;
+
+ /** Log entry decoder. */
+ private LogEntryDecoder logEntryDecoder;
+
+ /** First log index. */
+ private volatile long firstLogIndex = 1;
+
+ /** First log index loaded flag. */
+ private volatile boolean hasLoadFirstLogIndex;
+
+ public RocksDBSharedLogStorage(
+ RocksDB db,
+ ColumnFamilyHandle confHandle,
+ ColumnFamilyHandle dataHandle,
+ String groupId,
+ RaftOptions raftOptions
+ ) {
+ Requires.requireTrue(
+ !groupId.contains(Character.toString((char) 0)),
+ "Raft group id " + groupId + " must not contain char(0)"
+ );
+ Requires.requireTrue(
+ !groupId.contains(Character.toString((char) 1)),
+ "Raft group id " + groupId + " must not contain char(1)"
+ );
+
+ this.db = db;
+ this.confHandle = confHandle;
+ this.dataHandle = dataHandle;
+ this.groupStartPrefix = (groupId + (char)
0).getBytes(StandardCharsets.UTF_8);
+ this.groupEndPrefix = (groupId + (char)
1).getBytes(StandardCharsets.UTF_8);
+ this.lowerBound = new Slice(groupStartPrefix);
+ this.upperBound = new Slice(groupEndPrefix);
+
+ this.writeOptions = new WriteOptions();
+ this.writeOptions.setSync(raftOptions.isSync());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean init(LogStorageOptions opts) {
+ Requires.requireNonNull(opts.getConfigurationManager(), "Null conf
manager");
+ Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log
entry codec factory");
+ this.writeLock.lock();
+ try {
+ this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder();
+ this.logEntryEncoder = opts.getLogEntryCodecFactory().encoder();
+ Requires.requireNonNull(this.logEntryDecoder, "Null log entry
decoder");
+ Requires.requireNonNull(this.logEntryEncoder, "Null log entry
encoder");
+
+ return initAndLoad(opts.getConfigurationManager());
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private boolean initAndLoad(ConfigurationManager configurationManager) {
+ this.hasLoadFirstLogIndex = false;
+ this.firstLogIndex = 1;
+ load(configurationManager);
+ return onInitLoaded();
+ }
+
+ private void load(ConfigurationManager confManager) {
+ try (
+ var readOptions = new
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+ RocksIterator it = this.db.newIterator(this.confHandle,
readOptions)
+ ) {
+ it.seek(groupStartPrefix);
+ while (it.isValid()) {
+ byte[] keyWithPrefix = it.key();
+ byte[] ks = getKey(keyWithPrefix);
+ byte[] bs = it.value();
+
+ // LogEntry index
+ if (ks.length == 8) {
+ LogEntry entry = this.logEntryDecoder.decode(bs);
+ if (entry != null) {
+ if (entry.getType() ==
EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
+ ConfigurationEntry confEntry = new
ConfigurationEntry();
+ confEntry.setId(new
LogId(entry.getId().getIndex(), entry.getId().getTerm()));
+ confEntry.setConf(new
Configuration(entry.getPeers(), entry.getLearners()));
+ if (entry.getOldPeers() != null) {
+ confEntry.setOldConf(new
Configuration(entry.getOldPeers(), entry.getOldLearners()));
+ }
+ if (confManager != null) {
+ confManager.add(confEntry);
+ }
+ }
+ } else {
+ LOG.warn(
+ "Fail to decode conf entry at index {}, the
log data is: {}.",
+ ((long) LONG_ARRAY_HANDLE.get(ks, 0)),
+ BytesUtil.toHex(bs)
+ );
+ }
+ } else {
+ if (Arrays.equals(FIRST_LOG_IDX_KEY, ks)) {
+ setFirstLogIndex((long) LONG_ARRAY_HANDLE.get(bs, 0));
+ truncatePrefixInBackground(0L, this.firstLogIndex);
+ } else {
+ LOG.warn("Unknown entry in configuration storage
key={}, value={}.", BytesUtil.toHex(ks),
+ BytesUtil.toHex(bs));
+ }
+ }
+ it.next();
+ }
+ }
+ }
+
+ private byte[] getKey(byte[] ks) {
+ return copyOfRange(ks, groupStartPrefix.length, ks.length);
+ }
+
+ private void setFirstLogIndex(long index) {
+ this.firstLogIndex = index;
+ this.hasLoadFirstLogIndex = true;
+ }
+
+ /**
+ * Save the first log index into conf column family.
+ */
+ private boolean saveFirstLogIndex(long firstLogIndex) {
+ this.readLock.lock();
+ try {
+ byte[] vs = new byte[8];
+ LONG_ARRAY_HANDLE.set(vs, 0, firstLogIndex);
+ this.db.put(this.confHandle, this.writeOptions,
createKey(FIRST_LOG_IDX_KEY), vs);
+ return true;
+ } catch (RocksDBException e) {
+ LOG.error("Fail to save first log index {}.", e, firstLogIndex);
+ return false;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void shutdown() {
+ this.writeLock.lock();
+
+ try {
+ onShutdown();
+ }
+ finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getFirstLogIndex() {
+ this.readLock.lock();
+
+ try {
+ if (this.hasLoadFirstLogIndex) {
+ return this.firstLogIndex;
+ }
+
+ try (
+ var readOptions = new
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+ RocksIterator it = this.db.newIterator(this.dataHandle,
readOptions)
+ ) {
+ it.seek(groupStartPrefix);
+
+ if (it.isValid()) {
+ byte[] key = getKey(it.key());
+ long ret = (long) LONG_ARRAY_HANDLE.get(key, 0);
+ saveFirstLogIndex(ret);
+ setFirstLogIndex(ret);
+ return ret;
+ }
+
+ return 1L;
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getLastLogIndex() {
+ this.readLock.lock();
+
+ try (
+ var readOptions = new ReadOptions()
+ .setIterateUpperBound(upperBound)
+ .setIterateLowerBound(lowerBound)
+ .setTotalOrderSeek(true);
+ RocksIterator it = this.db.newIterator(this.dataHandle,
readOptions)
+ ) {
+ it.seekToLast();
+
+ if (it.isValid()) {
+ byte[] key = getKey(it.key());
+ return (long) LONG_ARRAY_HANDLE.get(key, 0);
+ }
+
+ return 0L;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LogEntry getEntry(long index) {
+ this.readLock.lock();
+ try {
+ if (this.hasLoadFirstLogIndex && index < this.firstLogIndex) {
+ return null;
+ }
+
+ byte[] keyBytes = createKey(index);
+ byte[] bs = getValueFromRocksDb(keyBytes);
+
+ if (bs != null) {
+ LogEntry entry = this.logEntryDecoder.decode(bs);
+ if (entry != null) {
+ return entry;
+ } else {
+ LOG.error("Bad log entry format for index={}, the log data
is: {}.", index, BytesUtil.toHex(bs));
+ // invalid data remove? TODO
https://issues.apache.org/jira/browse/IGNITE-14832
+ return null;
+ }
+ }
+ } catch (RocksDBException e) {
+ LOG.error("Fail to get log entry at index {}.", e, index);
+ } finally {
+ this.readLock.unlock();
+ }
+ return null;
+ }
+
+ protected byte[] getValueFromRocksDb(byte[] keyBytes) throws
RocksDBException {
+ return this.db.get(this.dataHandle, keyBytes);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getTerm(long index) {
+ LogEntry entry = getEntry(index);
+ if (entry != null) {
+ return entry.getId().getTerm();
+ }
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean appendEntry(LogEntry entry) {
+ if (entry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
+ return executeBatch(batch -> addConfBatch(entry, batch));
+ } else {
+ this.readLock.lock();
+ try {
+ if (this.db == null) {
+ LOG.warn("DB not initialized or destroyed.");
+ return false;
+ }
+ WriteContext writeCtx = newWriteContext();
+ long logIndex = entry.getId().getIndex();
+ byte[] valueBytes = this.logEntryEncoder.encode(entry);
+ byte[] newValueBytes = onDataAppend(logIndex, valueBytes,
writeCtx);
+ writeCtx.startJob();
+ this.db.put(this.dataHandle, this.writeOptions,
createKey(logIndex), newValueBytes);
+ writeCtx.joinAll();
+ if (newValueBytes != valueBytes) {
+ doSync();
+ }
+ return true;
+ } catch (RocksDBException | IOException e) {
+ LOG.error("Fail to append entry.", e);
+ return false;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int appendEntries(List<LogEntry> entries) {
+ if (entries == null || entries.isEmpty()) {
+ return 0;
+ }
+
+ int entriesCount = entries.size();
+
+ boolean ret = executeBatch(batch -> {
+ WriteContext writeCtx = newWriteContext();
+
+ for (LogEntry entry : entries) {
+ if (entry.getType() ==
EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
+ addConfBatch(entry, batch);
+ } else {
+ writeCtx.startJob();
+ addDataBatch(entry, batch, writeCtx);
+ }
+ }
+
+ writeCtx.joinAll();
+ doSync();
+ });
+
+ if (ret) {
+ return entriesCount;
+ } else {
+ return 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean truncateSuffix(long lastIndexKept) {
+ this.readLock.lock();
+ try {
+ try {
+ onTruncateSuffix(lastIndexKept);
+ } finally {
+ this.db.deleteRange(this.dataHandle, this.writeOptions,
createKey(lastIndexKept + 1),
+ createKey(getLastLogIndex() + 1));
+ this.db.deleteRange(this.confHandle, this.writeOptions,
createKey(lastIndexKept + 1),
+ createKey(getLastLogIndex() + 1));
+ }
+ return true;
+ } catch (RocksDBException | IOException e) {
+ LOG.error("Fail to truncateSuffix {}.", e, lastIndexKept);
+ } finally {
+ this.readLock.unlock();
+ }
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean reset(long nextLogIndex) {
+ if (nextLogIndex <= 0) {
+ throw new IllegalArgumentException("Invalid next log index.");
+ }
+ this.writeLock.lock();
+
+ LogEntry entry = getEntry(nextLogIndex);
+ try {
+ db.deleteRange(dataHandle, groupStartPrefix, groupEndPrefix);
+ db.deleteRange(confHandle, groupStartPrefix, groupEndPrefix);
+
+ onReset(nextLogIndex);
+
+ if (initAndLoad(null)) {
+ if (entry == null) {
+ entry = new LogEntry();
+ entry.setType(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
+ entry.setId(new LogId(nextLogIndex, 0));
+ LOG.warn("Entry not found for nextLogIndex {} when
reset.", nextLogIndex);
+ }
+ return appendEntry(entry);
+ } else {
+ return false;
+ }
+ } catch (RocksDBException e) {
+ LOG.error("Fail to reset next log index.", e);
+ return false;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean truncatePrefix(long firstIndexKept) {
+ this.readLock.lock();
+ try {
+ long startIndex = getFirstLogIndex();
+ boolean ret = saveFirstLogIndex(firstIndexKept);
+
+ if (ret) {
+ setFirstLogIndex(firstIndexKept);
+ }
+
+ truncatePrefixInBackground(startIndex, firstIndexKept);
+
+ return ret;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ private void addConfBatch(LogEntry entry, WriteBatch batch) throws
RocksDBException {
+ byte[] ks = createKey(entry.getId().getIndex());
+ byte[] content = this.logEntryEncoder.encode(entry);
+ batch.put(this.dataHandle, ks, content);
+ batch.put(this.confHandle, ks, content);
+ }
+
+ /**
+ * Execute write batch template.
+ *
+ * @param template write batch template
+ */
+ private boolean executeBatch(WriteBatchTemplate template) {
+ this.readLock.lock();
+ if (this.db == null) {
+ LOG.warn("DB not initialized or destroyed.");
+ this.readLock.unlock();
+ return false;
+ }
+ try (WriteBatch batch = new WriteBatch()) {
+ template.execute(batch);
+ this.db.write(this.writeOptions, batch);
+ } catch (RocksDBException e) {
+ LOG.error("Execute batch failed with rocksdb exception.", e);
+ return false;
+ } catch (IOException e) {
+ LOG.error("Execute batch failed with io exception.", e);
+ return false;
+ } catch (InterruptedException e) {
+ LOG.error("Execute batch failed with interrupt.", e);
+ Thread.currentThread().interrupt();
+ return false;
+ } finally {
+ this.readLock.unlock();
+ }
+ return true;
+ }
+
+ private void addDataBatch(LogEntry entry, WriteBatch batch,
+ WriteContext ctx) throws RocksDBException, IOException,
InterruptedException {
+ long logIndex = entry.getId().getIndex();
+ byte[] content = this.logEntryEncoder.encode(entry);
+ batch.put(this.dataHandle, createKey(logIndex), onDataAppend(logIndex,
content, ctx));
+ }
+
+ private void truncatePrefixInBackground(long startIndex, long
firstIndexKept) {
+ // delete logs in background.
+ Utils.runInThread(executor, () -> {
+ this.readLock.lock();
+ try {
+ if (this.db == null) {
+ return;
+ }
+ onTruncatePrefix(startIndex, firstIndexKept);
+ this.db.deleteRange(this.dataHandle, createKey(startIndex),
createKey(firstIndexKept));
+ this.db.deleteRange(this.confHandle, createKey(startIndex),
createKey(firstIndexKept));
+ } catch (RocksDBException | IOException e) {
+ LOG.error("Fail to truncatePrefix {}.", e, firstIndexKept);
+ } finally {
+ this.readLock.unlock();
+ }
+ });
+ }
+
+ /**
+ * Called upon closing the storage.
+ */
+ protected void onShutdown() {
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executor);
+
+ writeOptions.close();
+ upperBound.close();
+ lowerBound.close();
+ }
+
+ private byte[] createKey(byte[] key) {
+ var buffer = new byte[groupStartPrefix.length + key.length];
+
+ System.arraycopy(groupStartPrefix, 0, buffer, 0,
groupStartPrefix.length);
+ System.arraycopy(key, 0, buffer, groupStartPrefix.length, key.length);
+
+ return buffer;
+ }
+
+ private byte[] createKey(long index) {
+ byte[] ks = new byte[groupStartPrefix.length + Long.BYTES];
+ System.arraycopy(groupStartPrefix, 0, ks, 0, groupStartPrefix.length);
+ LONG_ARRAY_HANDLE.set(ks, groupStartPrefix.length, index);
Review Comment:
again, I think we don't need this VarHandle, using a direct ByteBuffer might
be a better alternative
##########
modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/SharedVsNonSharedLogStorageBenchmark.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
+import org.apache.ignite.raft.jraft.Lifecycle;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.LogStorageProvider;
+import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.apache.ignite.raft.jraft.util.Utils;
+
+/**
+ * Benchmark for shared versus non-shared log storage.
+ */
+public class SharedVsNonSharedLogStorageBenchmark {
Review Comment:
please fix all code style issues in this class
##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBSharedLogStorage.java:
##########
@@ -0,0 +1,737 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import static java.util.Arrays.copyOfRange;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.util.BytesUtil;
+import org.apache.ignite.raft.jraft.util.Describer;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.Utils;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+/**
+ * Log storage that shares rocksdb instance with other log storages.
+ * Stores key with groupId prefix to distinguish them from keys that belongs
to other storages.
+ */
+public class RocksDBSharedLogStorage implements LogStorage, Describer {
+ /** Logger. */
+ private static final IgniteLogger LOG =
IgniteLogger.forClass(RocksDBSharedLogStorage.class);
+
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ /**
+ * An empty write context
+ */
+ private static class EmptyWriteContext implements WriteContext {
+ static EmptyWriteContext INSTANCE = new EmptyWriteContext();
+ }
+
+ /**
+ * VarHandle that gives the access to the elements of a {@code byte[]}
array viewed as if it was a {@code long[]}
+ * array.
+ */
+ private static final VarHandle LONG_ARRAY_HANDLE =
MethodHandles.byteArrayViewVarHandle(
+ long[].class,
+ ByteOrder.BIG_ENDIAN
+ );
+
+ /**
+ * First log index and last log index key in configuration column family.
+ */
+ private static final byte[] FIRST_LOG_IDX_KEY =
Utils.getBytes("meta/firstLogIndex");
+
+ /** Shared db instance. */
+ private final RocksDB db;
+
+ /** Shared configuration column family handle. */
+ private final ColumnFamilyHandle confHandle;
+
+ /** Shared data column family handle. */
+ private final ColumnFamilyHandle dataHandle;
+
+ /** Write options. */
+ private final WriteOptions writeOptions;
+
+ /** Start prefix. */
+ private final byte[] groupStartPrefix;
+
+ /** End prefix. */
+ private final byte[] groupEndPrefix;
+
+ /** Iteration lower bound. */
+ private final Slice lowerBound;
+
+ /** Iteration upper bound. */
+ private final Slice upperBound;
+
+ /** RW lock. */
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+ /** Read lock. */
+ private final Lock readLock = this.readWriteLock.readLock();
+
+ /** Write lock. */
+ private final Lock writeLock = this.readWriteLock.writeLock();
+
+ /** Executor that handles prefix truncation. */
+ private final ExecutorService executor =
Executors.newSingleThreadExecutor();
+
+ /** Log entry encoder. */
+ private LogEntryEncoder logEntryEncoder;
+
+ /** Log entry decoder. */
+ private LogEntryDecoder logEntryDecoder;
+
+ /** First log index. */
+ private volatile long firstLogIndex = 1;
+
+ /** First log index loaded flag. */
+ private volatile boolean hasLoadFirstLogIndex;
+
+ public RocksDBSharedLogStorage(
+ RocksDB db,
+ ColumnFamilyHandle confHandle,
+ ColumnFamilyHandle dataHandle,
+ String groupId,
+ RaftOptions raftOptions
+ ) {
+ Requires.requireTrue(
+ !groupId.contains(Character.toString((char) 0)),
+ "Raft group id " + groupId + " must not contain char(0)"
+ );
+ Requires.requireTrue(
+ !groupId.contains(Character.toString((char) 1)),
+ "Raft group id " + groupId + " must not contain char(1)"
+ );
+
+ this.db = db;
+ this.confHandle = confHandle;
+ this.dataHandle = dataHandle;
+ this.groupStartPrefix = (groupId + (char)
0).getBytes(StandardCharsets.UTF_8);
+ this.groupEndPrefix = (groupId + (char)
1).getBytes(StandardCharsets.UTF_8);
+ this.lowerBound = new Slice(groupStartPrefix);
+ this.upperBound = new Slice(groupEndPrefix);
+
+ this.writeOptions = new WriteOptions();
+ this.writeOptions.setSync(raftOptions.isSync());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean init(LogStorageOptions opts) {
+ Requires.requireNonNull(opts.getConfigurationManager(), "Null conf
manager");
+ Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log
entry codec factory");
+ this.writeLock.lock();
+ try {
+ this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder();
+ this.logEntryEncoder = opts.getLogEntryCodecFactory().encoder();
+ Requires.requireNonNull(this.logEntryDecoder, "Null log entry
decoder");
+ Requires.requireNonNull(this.logEntryEncoder, "Null log entry
encoder");
+
+ return initAndLoad(opts.getConfigurationManager());
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private boolean initAndLoad(ConfigurationManager configurationManager) {
+ this.hasLoadFirstLogIndex = false;
+ this.firstLogIndex = 1;
+ load(configurationManager);
+ return onInitLoaded();
+ }
+
+ private void load(ConfigurationManager confManager) {
+ try (
+ var readOptions = new
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+ RocksIterator it = this.db.newIterator(this.confHandle,
readOptions)
+ ) {
+ it.seek(groupStartPrefix);
+ while (it.isValid()) {
+ byte[] keyWithPrefix = it.key();
+ byte[] ks = getKey(keyWithPrefix);
+ byte[] bs = it.value();
+
+ // LogEntry index
+ if (ks.length == 8) {
+ LogEntry entry = this.logEntryDecoder.decode(bs);
+ if (entry != null) {
+ if (entry.getType() ==
EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
+ ConfigurationEntry confEntry = new
ConfigurationEntry();
+ confEntry.setId(new
LogId(entry.getId().getIndex(), entry.getId().getTerm()));
+ confEntry.setConf(new
Configuration(entry.getPeers(), entry.getLearners()));
+ if (entry.getOldPeers() != null) {
+ confEntry.setOldConf(new
Configuration(entry.getOldPeers(), entry.getOldLearners()));
+ }
+ if (confManager != null) {
+ confManager.add(confEntry);
+ }
+ }
+ } else {
+ LOG.warn(
+ "Fail to decode conf entry at index {}, the
log data is: {}.",
+ ((long) LONG_ARRAY_HANDLE.get(ks, 0)),
+ BytesUtil.toHex(bs)
+ );
+ }
+ } else {
+ if (Arrays.equals(FIRST_LOG_IDX_KEY, ks)) {
+ setFirstLogIndex((long) LONG_ARRAY_HANDLE.get(bs, 0));
+ truncatePrefixInBackground(0L, this.firstLogIndex);
+ } else {
+ LOG.warn("Unknown entry in configuration storage
key={}, value={}.", BytesUtil.toHex(ks),
+ BytesUtil.toHex(bs));
+ }
+ }
+ it.next();
+ }
+ }
+ }
+
+ private byte[] getKey(byte[] ks) {
+ return copyOfRange(ks, groupStartPrefix.length, ks.length);
+ }
+
+ private void setFirstLogIndex(long index) {
+ this.firstLogIndex = index;
+ this.hasLoadFirstLogIndex = true;
+ }
+
+ /**
+ * Save the first log index into conf column family.
+ */
+ private boolean saveFirstLogIndex(long firstLogIndex) {
+ this.readLock.lock();
+ try {
+ byte[] vs = new byte[8];
+ LONG_ARRAY_HANDLE.set(vs, 0, firstLogIndex);
+ this.db.put(this.confHandle, this.writeOptions,
createKey(FIRST_LOG_IDX_KEY), vs);
+ return true;
+ } catch (RocksDBException e) {
+ LOG.error("Fail to save first log index {}.", e, firstLogIndex);
+ return false;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void shutdown() {
+ this.writeLock.lock();
+
+ try {
+ onShutdown();
+ }
+ finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getFirstLogIndex() {
+ this.readLock.lock();
+
+ try {
+ if (this.hasLoadFirstLogIndex) {
+ return this.firstLogIndex;
+ }
+
+ try (
+ var readOptions = new
ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true);
+ RocksIterator it = this.db.newIterator(this.dataHandle,
readOptions)
+ ) {
+ it.seek(groupStartPrefix);
+
+ if (it.isValid()) {
+ byte[] key = getKey(it.key());
+ long ret = (long) LONG_ARRAY_HANDLE.get(key, 0);
+ saveFirstLogIndex(ret);
+ setFirstLogIndex(ret);
+ return ret;
+ }
+
+ return 1L;
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long getLastLogIndex() {
+ this.readLock.lock();
+
+ try (
+ var readOptions = new ReadOptions()
+ .setIterateUpperBound(upperBound)
Review Comment:
why do you need all these options? Look like
`it.seekForPrev(groupEndPrefix)` should suffice
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]