This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 79d5bc824d IGNITE-21902 Add an option to configure log storage path (#3605) 79d5bc824d is described below commit 79d5bc824d001f97f09e9b06ec9f5a5a094fb15e Author: Phillippko <phillip...@gmail.com> AuthorDate: Mon Apr 15 19:02:28 2024 +0400 IGNITE-21902 Add an option to configure log storage path (#3605) --- .../server/raft/ItMetaStorageRaftGroupTest.java | 24 ++++- .../configuration/RaftConfigurationSchema.java | 4 + .../ignite/raft/server/ItJraftHlcServerTest.java | 12 +-- .../raft/server/ItJraftServerLogPathTest.java | 113 +++++++++++++++++++++ .../raft/server/ItSimpleCounterServerTest.java | 7 +- .../ignite/raft/server/JraftAbstractTest.java | 17 +--- .../ignite/raft/server/RaftServerAbstractTest.java | 34 +++++++ .../java/org/apache/ignite/internal/raft/Loza.java | 2 +- .../internal/raft/server/impl/JraftServerImpl.java | 41 ++++++-- .../storage/impl/DefaultLogStorageFactory.java | 21 ++-- .../raft/storage/logit/LogitLogStorageFactory.java | 17 ++-- .../ignite/raft/jraft/option/NodeOptions.java | 1 - .../jraft/storage/impl/LogStorageBenchmark.java | 2 +- .../jraft/storage/logit/LogitLogStorageTest.java | 3 +- .../service/ItAbstractListenerSnapshotTest.java | 2 +- .../AbstractTopologyAwareGroupServiceTest.java | 1 + 16 files changed, 233 insertions(+), 68 deletions(-) diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java index fc29e48c7f..e3e6907dbc 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java @@ -365,11 +365,29 @@ public class ItMetaStorageRaftGroupTest extends IgniteAbstractTest { List.of(new UserReplicatorStateListener(replicatorStartedCounter, replicatorStoppedCounter))); opt3.setCommandsMarshaller(commandsMarshaller); - metaStorageRaftSrv1 = new JraftServerImpl(cluster.get(0), workDir.resolve("node1"), opt1, new RaftGroupEventsClientListener()); + metaStorageRaftSrv1 = new JraftServerImpl( + cluster.get(0), + workDir.resolve("node1"), + raftConfiguration, + opt1, + new RaftGroupEventsClientListener() + ); - metaStorageRaftSrv2 = new JraftServerImpl(cluster.get(1), workDir.resolve("node2"), opt2, new RaftGroupEventsClientListener()); + metaStorageRaftSrv2 = new JraftServerImpl( + cluster.get(1), + workDir.resolve("node2"), + raftConfiguration, + opt2, + new RaftGroupEventsClientListener() + ); - metaStorageRaftSrv3 = new JraftServerImpl(cluster.get(2), workDir.resolve("node3"), opt3, new RaftGroupEventsClientListener()); + metaStorageRaftSrv3 = new JraftServerImpl( + cluster.get(2), + workDir.resolve("node3"), + raftConfiguration, + opt3, + new RaftGroupEventsClientListener() + ); metaStorageRaftSrv1.start(); diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java index ff888b4425..b31714a049 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/configuration/RaftConfigurationSchema.java @@ -83,4 +83,8 @@ public class RaftConfigurationSchema { */ @Value(hasDefault = true) public boolean logYieldStrategy = false; + + /** Directory where log is stored. By default "log" subfolder of data storage path is used. */ + @Value(hasDefault = true) + public String logPath = ""; } diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java index 5c70787eb7..8ff1d94fdc 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftHlcServerTest.java @@ -47,7 +47,6 @@ import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.RaftGroupService; import org.apache.ignite.raft.jraft.core.NodeImpl; import org.apache.ignite.raft.jraft.option.NodeOptions; -import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.apache.ignite.raft.jraft.test.TestUtils; import org.apache.ignite.raft.server.counter.CounterListener; import org.junit.jupiter.api.AfterEach; @@ -132,16 +131,7 @@ class ItJraftHlcServerTest extends RaftServerAbstractTest { cons.accept(opts); - JraftServerImpl server = new JraftServerImpl(service, workDir.resolve("node" + idx), opts, new RaftGroupEventsClientListener()) { - @Override - public void stop() throws Exception { - servers.remove(this); - - super.stop(); - - service.stop(); - } - }; + JraftServerImpl server = jraftServer(servers, idx, service, opts); server.start(); diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftServerLogPathTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftServerLogPathTest.java new file mode 100644 index 0000000000..fbd4086e23 --- /dev/null +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftServerLogPathTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.raft.server; + +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import org.apache.ignite.internal.network.ClusterService; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; +import org.apache.ignite.internal.raft.storage.logit.LogitLogStorageFactory; +import org.apache.ignite.internal.testframework.WithSystemProperty; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.raft.jraft.option.NodeOptions; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** Tests for checking that JRaftServer uses log storage path from configuration. */ +class ItJraftServerLogPathTest extends RaftServerAbstractTest { + private Path dataPath; + private JraftServerImpl server; + + @BeforeEach + void setUp() { + dataPath = workDir.resolve("node0"); + } + + @AfterEach + void tearDown() throws Exception { + server.stop(); + } + + @Test + @WithSystemProperty(key = JraftServerImpl.LOGIT_STORAGE_ENABLED_PROPERTY, value = "false") + void testDefaultFactory() { + Path logPath = workDir.resolve("db/log"); + assertThat(raftConfiguration.logPath().update(logPath.toString()), willCompleteSuccessfully()); + + server = startServer(raftConfiguration); + + assertTrue(Files.exists(logPath)); + } + + @Test + @WithSystemProperty(key = JraftServerImpl.LOGIT_STORAGE_ENABLED_PROPERTY, value = "true") + void testLogitFactory() { + Path logPath = workDir.resolve("db/log"); + assertThat(raftConfiguration.logPath().update(logPath.toString()), willCompleteSuccessfully()); + + server = startServer(raftConfiguration); + + LogitLogStorageFactory factory = (LogitLogStorageFactory) server.getLogStorageFactory(); + assertEquals(logPath.resolve("log-1"), factory.resolveLogStoragePath("1")); + } + + @Test + @WithSystemProperty(key = JraftServerImpl.LOGIT_STORAGE_ENABLED_PROPERTY, value = "false") + void testDefaultLogPathDefaultFactory() { + server = startServer(raftConfiguration); + + assertTrue(Files.exists(dataPath.resolve("log"))); + } + + @Test + @WithSystemProperty(key = JraftServerImpl.LOGIT_STORAGE_ENABLED_PROPERTY, value = "true") + void testDefaultLogPathLogitFactory() { + server = startServer(raftConfiguration); + + LogitLogStorageFactory factory = (LogitLogStorageFactory) server.getLogStorageFactory(); + assertEquals(dataPath.resolve("log/log-1"), factory.resolveLogStoragePath("1")); + } + + private JraftServerImpl startServer(RaftConfiguration raftConfiguration) { + var addr = new NetworkAddress(getLocalAddress(), PORT); + + ClusterService service = clusterService(PORT, List.of(addr), true); + + JraftServerImpl server = new JraftServerImpl( + service, + dataPath, + raftConfiguration, + new NodeOptions(), + new RaftGroupEventsClientListener() + ); + + server.start(); + + return server; + } +} diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java index ed61cc70b9..fda140da26 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java @@ -32,14 +32,12 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; -import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupServiceImpl; import org.apache.ignite.internal.raft.RaftNodeId; -import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.server.RaftGroupOptions; import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; @@ -88,9 +86,6 @@ class ItSimpleCounterServerTest extends RaftServerAbstractTest { /** Executor for raft group services. */ private ScheduledExecutorService executor; - @InjectConfiguration - private RaftConfiguration raftConfiguration; - /** * Before each. */ @@ -100,7 +95,7 @@ class ItSimpleCounterServerTest extends RaftServerAbstractTest { ClusterService service = clusterService(PORT, List.of(addr), true); - server = new JraftServerImpl(service, workDir) { + server = new JraftServerImpl(service, workDir, raftConfiguration) { @Override public synchronized void stop() throws Exception { super.stop(); diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java index f2cb308547..df0d68ef63 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/JraftAbstractTest.java @@ -34,13 +34,11 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.IntStream; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; -import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupServiceImpl; import org.apache.ignite.internal.raft.RaftNodeId; -import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.raft.service.RaftGroupService; @@ -50,7 +48,6 @@ import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.option.NodeOptions; -import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.apache.ignite.raft.jraft.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -74,9 +71,6 @@ public abstract class JraftAbstractTest extends RaftServerAbstractTest { */ private static final int CLIENT_PORT = 6003; - @InjectConfiguration - private RaftConfiguration raftConfiguration; - /** * Initial configuration. */ @@ -175,16 +169,7 @@ public abstract class JraftAbstractTest extends RaftServerAbstractTest { optionsUpdater.accept(opts); - JraftServerImpl server = new JraftServerImpl(service, workDir.resolve("node" + idx), opts, new RaftGroupEventsClientListener()) { - @Override - public void stop() throws Exception { - servers.remove(this); - - super.stop(); - - service.stop(); - } - }; + JraftServerImpl server = jraftServer(servers, idx, service, opts); server.start(); diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java index 43860dd0b9..7d4936ace5 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java @@ -17,21 +17,30 @@ package org.apache.ignite.raft.server; +import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.StaticNodeFinder; import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.RaftMessagesFactory; +import org.apache.ignite.raft.jraft.option.NodeOptions; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; /** * Abstract test for raft server. */ +@ExtendWith(ConfigurationExtension.class) abstract class RaftServerAbstractTest extends IgniteAbstractTest { protected static final RaftMessagesFactory FACTORY = new RaftMessagesFactory(); @@ -40,6 +49,10 @@ abstract class RaftServerAbstractTest extends IgniteAbstractTest { */ protected static final int PORT = 20010; + /** Raft configuration. */ + @InjectConfiguration + protected RaftConfiguration raftConfiguration; + /** Test info. */ TestInfo testInfo; @@ -77,4 +90,25 @@ abstract class RaftServerAbstractTest extends IgniteAbstractTest { return network; } + + protected JraftServerImpl jraftServer(List<JraftServerImpl> servers, int idx, ClusterService service, NodeOptions opts) { + Path dataPath = workDir.resolve("node" + idx); + + return new JraftServerImpl( + service, + dataPath, + raftConfiguration, + opts, + new RaftGroupEventsClientListener() + ) { + @Override + public void stop() throws Exception { + servers.remove(this); + + super.stop(); + + service.stop(); + } + }; + } } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java index 78b88cb789..514f7edc1d 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java @@ -122,7 +122,7 @@ public class Loza implements RaftManager { this.opts = options; - this.raftServer = new JraftServerImpl(clusterNetSvc, dataPath, options, raftGroupEventsClientListener); + this.raftServer = new JraftServerImpl(clusterNetSvc, dataPath, raftConfiguration, options, raftGroupEventsClientListener); this.executor = new ScheduledThreadPoolExecutor( CLIENT_POOL_SIZE, diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index d3a97a46fb..e72ccef383 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupEventsListener; import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.WriteCommand; +import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.server.RaftGroupOptions; import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.service.CommandClosure; @@ -137,6 +138,9 @@ public class JraftServerImpl implements RaftServer { /** Options. */ private final NodeOptions opts; + /** Raft configuration. */ + private final RaftConfiguration raftConfiguration; + private final RaftGroupEventsClientListener raftGroupEventsClientListener; /** Request executor. */ @@ -157,32 +161,37 @@ public class JraftServerImpl implements RaftServer { /** * The constructor. * - * @param service Cluster service. + * @param service Cluster service. * @param dataPath Data path. + * @param raftConfiguration Raft configuration. */ - public JraftServerImpl(ClusterService service, Path dataPath) { - this(service, dataPath, new NodeOptions(), new RaftGroupEventsClientListener()); + public JraftServerImpl(ClusterService service, Path dataPath, RaftConfiguration raftConfiguration) { + this(service, dataPath, raftConfiguration, new NodeOptions(), new RaftGroupEventsClientListener()); } /** * The constructor. * - * @param service Cluster service. + * @param service Cluster service. * @param dataPath Data path. - * @param opts Default node options. + * @param raftConfiguration Raft configuration. + * @param opts Default node options. */ public JraftServerImpl( ClusterService service, Path dataPath, + RaftConfiguration raftConfiguration, NodeOptions opts, RaftGroupEventsClientListener raftGroupEventsClientListener ) { this.service = service; this.dataPath = dataPath; this.nodeManager = new NodeManager(); + this.raftConfiguration = raftConfiguration; + this.logStorageFactory = IgniteSystemProperties.getBoolean(LOGIT_STORAGE_ENABLED_PROPERTY, false) - ? new LogitLogStorageFactory(service.nodeName(), dataPath.resolve("log"), getLogOptions()) - : new DefaultLogStorageFactory(service.nodeName(), dataPath.resolve("log")); + ? new LogitLogStorageFactory(service.nodeName(), getLogOptions(), this::getLogPath) + : new DefaultLogStorageFactory(service.nodeName(), this::getLogPath); this.opts = opts; this.raftGroupEventsClientListener = raftGroupEventsClientListener; @@ -222,11 +231,23 @@ public class JraftServerImpl implements RaftServer { return new StoreOptions(); } - /** Returns write-ahead log synchronizer. */ + private Path getLogPath() { + return raftConfiguration.logPath().value().isEmpty() + ? dataPath.resolve("log") + : Path.of(raftConfiguration.logPath().value()); + } + + /** Returns log synchronizer. */ public LogSyncer getLogSyncer() { return logStorageFactory; } + /** Returns log storage factory. */ + @TestOnly + public LogStorageFactory getLogStorageFactory() { + return logStorageFactory; + } + /** * Sets {@link AppendEntriesRequestInterceptor} to use. Should only be called from the same thread that is used * to {@link #start()} the component. @@ -238,8 +259,8 @@ public class JraftServerImpl implements RaftServer { } /** - * Sets {@link ActionRequestInterceptor} to use. Should only be called from the same thread that is used - * to {@link #start()} the component. + * Sets {@link ActionRequestInterceptor} to use. Should only be called from the same thread that is used to {@link #start()} the + * component. * * @param actionRequestInterceptor Interceptor to use. */ diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java index 7b04ca1069..2acabdf317 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Supplier; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.raft.storage.LogStorageFactory; @@ -54,8 +55,8 @@ import org.rocksdb.util.SizeUnit; public class DefaultLogStorageFactory implements LogStorageFactory { private static final IgniteLogger LOG = Loggers.forClass(DefaultLogStorageFactory.class); - /** Database path. */ - private final Path path; + /** Function to get path to the log storage. */ + private final Supplier<Path> logPathSupplier; /** Executor for shared storages. */ private final ExecutorService executorService; @@ -88,16 +89,16 @@ public class DefaultLogStorageFactory implements LogStorageFactory { */ @TestOnly public DefaultLogStorageFactory(Path path) { - this("test", path); + this("test", () -> path); } /** * Constructor. * - * @param path Path to the storage. + * @param logPathSupplier Function to get path to the log storage. */ - public DefaultLogStorageFactory(String nodeName, Path path) { - this.path = path; + public DefaultLogStorageFactory(String nodeName, Supplier<Path> logPathSupplier) { + this.logPathSupplier = logPathSupplier; executorService = Executors.newSingleThreadExecutor( NamedThreadFactory.create(nodeName, "raft-shared-log-storage-pool", LOG) @@ -107,10 +108,12 @@ public class DefaultLogStorageFactory implements LogStorageFactory { /** {@inheritDoc} */ @Override public void start() { + Path logPath = logPathSupplier.get(); + try { - Files.createDirectories(path); + Files.createDirectories(logPath); } catch (IOException e) { - throw new IllegalStateException("Failed to create directory: " + this.path, e); + throw new IllegalStateException("Failed to create directory: " + logPath, e); } List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(); @@ -128,7 +131,7 @@ public class DefaultLogStorageFactory implements LogStorageFactory { ); try { - this.db = RocksDB.open(this.dbOptions, this.path.toString(), columnFamilyDescriptors, columnFamilyHandles); + this.db = RocksDB.open(this.dbOptions, logPath.toString(), columnFamilyDescriptors, columnFamilyHandles); // Setup rocks thread pools to utilize all the available cores as the database is shared among // all the raft groups diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java index 1220e2f98a..4eb57d1592 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/logit/LogitLogStorageFactory.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft.storage.logit; import java.nio.file.Path; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -46,18 +47,19 @@ public class LogitLogStorageFactory implements LogStorageFactory { /** Executor for shared storages. */ private final ScheduledExecutorService checkpointExecutor; - private final Path baseLogStoragesPath; - private final StoreOptions storeOptions; + /** Function to get base location of all log storages, created by this factory. */ + private final Supplier<Path> logPathSupplier; + /** * Constructor. * - * @param baseLogStoragesPath Location of all log storages, created by this factory. + * @param logPathSupplier Function to get base path of all log storages, created by this factory. * @param storeOptions Logit log storage options. */ - public LogitLogStorageFactory(String nodeName, Path baseLogStoragesPath, StoreOptions storeOptions) { - this.baseLogStoragesPath = baseLogStoragesPath; + public LogitLogStorageFactory(String nodeName, StoreOptions storeOptions, Supplier<Path> logPathSupplier) { + this.logPathSupplier = logPathSupplier; this.storeOptions = storeOptions; checkpointExecutor = Executors.newSingleThreadScheduledExecutor( NamedThreadFactory.create(nodeName, "logit-checkpoint-executor", LOG) @@ -98,7 +100,8 @@ public class LogitLogStorageFactory implements LogStorageFactory { throw new UnsupportedOperationException("Not implemented yet"); } - private Path resolveLogStoragePath(String groupId) { - return baseLogStoragesPath.resolve(LOG_DIR_PREFIX + groupId); + /** Returns path to log storage by group ID. */ + public Path resolveLogStoragePath(String groupId) { + return logPathSupplier.get().resolve(LOG_DIR_PREFIX + groupId); } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java index 3965251ee7..0d140987dc 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridClock; -import org.apache.ignite.internal.lang.IgniteSystemProperties; import org.apache.ignite.internal.raft.JraftGroupEventsListener; import org.apache.ignite.internal.raft.Marshaller; import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe; diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogStorageBenchmark.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogStorageBenchmark.java index caf044ccca..a78ce7ba6e 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogStorageBenchmark.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogStorageBenchmark.java @@ -150,7 +150,7 @@ public class LogStorageBenchmark { int totalLogs = 100 * 1024; // LogStorageFactory logStorageFactory = new DefaultLogStorageFactory(testPath); - LogStorageFactory logStorageFactory = new LogitLogStorageFactory("test", testPath, new StoreOptions()); + LogStorageFactory logStorageFactory = new LogitLogStorageFactory("test", new StoreOptions(), () -> testPath); logStorageFactory.start(); try (AutoCloseable factory = logStorageFactory::close) { diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java index 6a202a0909..98f8aff59f 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java @@ -43,11 +43,10 @@ import org.junit.jupiter.api.extension.ExtendWith; public class LogitLogStorageTest extends BaseLogStorageTest { private LogitLogStorageFactory logStorageFactory; - @BeforeEach @Override public void setup() throws Exception { - logStorageFactory = new LogitLogStorageFactory("test", path, testStoreOptions()); + logStorageFactory = new LogitLogStorageFactory("test", testStoreOptions(), () -> path); logStorageFactory.start(); super.setup(); diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java index 65a231b81c..14525de721 100644 --- a/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java +++ b/modules/raft/src/testFixtures/java/org/apache/ignite/internal/raft/service/ItAbstractListenerSnapshotTest.java @@ -416,7 +416,7 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener Path jraft = workDir.resolve("jraft" + idx); - JraftServerImpl server = new JraftServerImpl(service, jraft) { + JraftServerImpl server = new JraftServerImpl(service, jraft, raftConfiguration) { @Override public void stop() throws Exception { super.stop(); diff --git a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java index 9dbc3fa77a..345ad04d02 100644 --- a/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java +++ b/modules/replicator/src/testFixtures/java/org/apache/ignite/internal/raft/client/AbstractTopologyAwareGroupServiceTest.java @@ -425,6 +425,7 @@ public abstract class AbstractTopologyAwareGroupServiceTest extends IgniteAbstra var raftServer = new JraftServerImpl( cluster, dataPath, + raftConfiguration, nodeOptions, eventsClientListener );