alievmirza commented on code in PR #936:
URL: https://github.com/apache/ignite-3/pull/936#discussion_r929114999
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -466,6 +495,25 @@ public void stopBlockMessages(String groupId) {
client.stopBlock();
}
+ /**
+ * Initialize (if needed) and receive per raft group lock,
+ * to prevent concurrent start of the same raft group.
+ *
+ * @param grpId Group id.
+ * @return lock
+ */
+ private Lock groupLock(String grpId) {
+ var newGroupLock = new ReentrantLock();
Review Comment:
It seems to me that this is inefficient to create new ReentrantLock every
time we call this method. Let's create it only if `startGroupInProgressLocks`
doesn't contain lock for grpId
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java:
##########
@@ -326,89 +331,113 @@ public synchronized boolean startRaftGroup(
/** {@inheritDoc} */
@Override
- public synchronized boolean startRaftGroup(
- String groupId,
+ public boolean startRaftGroup(
+ String grpId,
RaftGroupEventsListener evLsnr,
RaftGroupListener lsnr,
@Nullable List<Peer> initialConf,
RaftGroupOptions groupOptions
) {
- if (groups.containsKey(groupId)) {
+ // fast track to check if group with the same name is already created.
+ if (groups.containsKey(grpId)) {
return false;
}
- // Thread pools are shared by all raft groups.
- NodeOptions nodeOptions = opts.copy();
+ Lock lock = groupLock(grpId);
- // TODO: IGNITE-17083 - Do not create paths for volatile stores at all
when we get rid of snapshot storage on FS.
- Path serverDataPath = getServerDataPath(groupId);
+ lock.lock();
try {
- Files.createDirectories(serverDataPath);
- } catch (IOException e) {
- throw new IgniteInternalException(e);
- }
+ // double check if group wasn't created before receiving the lock.
+ if (groups.containsKey(grpId)) {
+ return false;
+ }
- nodeOptions.setLogUri(groupId);
+ // Thread pools are shared by all raft groups.
+ NodeOptions nodeOptions = opts.copy();
- nodeOptions.setRaftMetaUri(serverDataPath.resolve("meta").toString());
+ // TODO: IGNITE-17083 - Do not create paths for volatile stores at
all when we get rid of snapshot storage on FS.
+ Path serverDataPath = getServerDataPath(grpId);
-
nodeOptions.setSnapshotUri(serverDataPath.resolve("snapshot").toString());
+ try {
+ Files.createDirectories(serverDataPath);
+ } catch (IOException e) {
+ throw new IgniteInternalException(e);
+ }
- nodeOptions.setFsm(new DelegatingStateMachine(lsnr));
+ nodeOptions.setLogUri(grpId);
- nodeOptions.setRaftGrpEvtsLsnr(evLsnr);
+
nodeOptions.setRaftMetaUri(serverDataPath.resolve("meta").toString());
- LogStorageFactory logStorageFactory =
groupOptions.getLogStorageFactory() == null
- ? this.logStorageFactory : groupOptions.getLogStorageFactory();
+
nodeOptions.setSnapshotUri(serverDataPath.resolve("snapshot").toString());
- IgniteJraftServiceFactory serviceFactory = new
IgniteJraftServiceFactory(logStorageFactory);
+ nodeOptions.setFsm(new DelegatingStateMachine(lsnr));
- if (groupOptions.snapshotStorageFactory() != null) {
-
serviceFactory.setSnapshotStorageFactory(groupOptions.snapshotStorageFactory());
- }
+ nodeOptions.setRaftGrpEvtsLsnr(evLsnr);
- if (groupOptions.raftMetaStorageFactory() != null) {
-
serviceFactory.setRaftMetaStorageFactory(groupOptions.raftMetaStorageFactory());
- }
+ LogStorageFactory logStorageFactory =
groupOptions.getLogStorageFactory() == null
+ ? this.logStorageFactory :
groupOptions.getLogStorageFactory();
- nodeOptions.setServiceFactory(serviceFactory);
+ IgniteJraftServiceFactory serviceFactory = new
IgniteJraftServiceFactory(logStorageFactory);
- if (initialConf != null) {
- List<PeerId> mapped =
initialConf.stream().map(PeerId::fromPeer).collect(Collectors.toList());
+ if (groupOptions.snapshotStorageFactory() != null) {
+
serviceFactory.setSnapshotStorageFactory(groupOptions.snapshotStorageFactory());
+ }
- nodeOptions.setInitialConf(new Configuration(mapped, null));
- }
+ if (groupOptions.raftMetaStorageFactory() != null) {
+
serviceFactory.setRaftMetaStorageFactory(groupOptions.raftMetaStorageFactory());
+ }
+
+ nodeOptions.setServiceFactory(serviceFactory);
- IgniteRpcClient client = new IgniteRpcClient(service);
+ if (initialConf != null) {
+ List<PeerId> mapped =
initialConf.stream().map(PeerId::fromPeer).collect(Collectors.toList());
- nodeOptions.setRpcClient(client);
+ nodeOptions.setInitialConf(new Configuration(mapped, null));
+ }
+
+ IgniteRpcClient client = new IgniteRpcClient(service);
- NetworkAddress addr =
service.topologyService().localMember().address();
+ nodeOptions.setRpcClient(client);
- var peerId = new PeerId(addr.host(), addr.port(), 0,
ElectionPriority.DISABLED);
+ NetworkAddress addr =
service.topologyService().localMember().address();
- var server = new RaftGroupService(groupId, peerId, nodeOptions,
rpcServer, nodeManager);
+ var peerId = new PeerId(addr.host(), addr.port(), 0,
ElectionPriority.DISABLED);
- server.start();
+ var server = new RaftGroupService(grpId, peerId, nodeOptions,
rpcServer, nodeManager);
- groups.put(groupId, server);
+ server.start();
- return true;
+ groups.put(grpId, server);
+
+ return true;
+ } finally {
+ lock.unlock();
+ }
}
/** {@inheritDoc} */
@Override
- public boolean stopRaftGroup(String groupId) {
- RaftGroupService svc = groups.remove(groupId);
+ public boolean stopRaftGroup(String grpId) {
Review Comment:
If you call this method twice sequentially, `startGroupInProgressLocks` will
contain lock for a stopped group. This may cause OOM, if such groups wont be
started anymore.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]