vldpyatkov commented on code in PR #1692:
URL: https://github.com/apache/ignite-3/pull/1692#discussion_r1111591055
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -354,7 +369,14 @@ public class IgniteImpl implements Ignite {
new RocksDbKeyValueStorage(name,
workDir.resolve(METASTORAGE_DB_PATH))
);
- placementDriverMgr = new PlacementDriverManager(metaStorageMgr);
+ placementDriverMgr = new PlacementDriverManager(
+ MetastorageGroupId.INSTANCE,
Review Comment:
Why are you passing a constant to parameters?
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -354,7 +369,14 @@ public class IgniteImpl implements Ignite {
new RocksDbKeyValueStorage(name,
workDir.resolve(METASTORAGE_DB_PATH))
);
- placementDriverMgr = new PlacementDriverManager(metaStorageMgr);
+ placementDriverMgr = new PlacementDriverManager(
+ MetastorageGroupId.INSTANCE,
+ clusterSvc,
+ raftConfiguration,
+ cmgMgr::metaStorageNodes,
+ logicalTopologyService,
+ raftExecutorService
Review Comment:
If you extract the pool, you should drive of lifecycle of the object (I mean
invoke shutdown).
But I sure, it is not necessary.
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -354,7 +369,14 @@ public class IgniteImpl implements Ignite {
new RocksDbKeyValueStorage(name,
workDir.resolve(METASTORAGE_DB_PATH))
);
- placementDriverMgr = new PlacementDriverManager(metaStorageMgr);
+ placementDriverMgr = new PlacementDriverManager(
+ MetastorageGroupId.INSTANCE,
+ clusterSvc,
+ raftConfiguration,
+ cmgMgr::metaStorageNodes,
Review Comment:
Better to pass manager here, because it is a common way to pass one manager
to another.
But passing a function hard to understanding in the future.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java:
##########
@@ -164,11 +175,12 @@ public static CompletableFuture<RaftGroupService> start(
PeersAndLearners configuration,
boolean getLeader,
ScheduledExecutorService executor,
- LogicalTopologyService logicalTopologyService
+ LogicalTopologyService logicalTopologyService,
+ boolean notifyOnSubscription
Review Comment:
Maybe a better way to add this flag to subscribeLeader method.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java:
##########
@@ -31,21 +46,94 @@ public class PlacementDriverManager implements
IgniteComponent {
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+ private final RaftMessagesFactory raftMessagesFactory = new
RaftMessagesFactory();
+
/** Prevents double stopping of the component. */
private final AtomicBoolean isStopped = new AtomicBoolean();
+ private final ReplicationGroupId replicationGroupId;
+
+ private final ClusterService clusterService;
+
+ private final Supplier<CompletableFuture<Set<String>>>
placementDriverNodesNamesProvider;
+
+ /**
+ * Raft client future. Can contain null, if this node is not in placement
driver group.
+ */
+ private final CompletableFuture<TopologyAwareRaftGroupService>
raftClientFuture;
+
+ private final ScheduledExecutorService raftClientExecutor;
+
+ private final LogicalTopologyService logicalTopologyService;
+
+ private final RaftConfiguration raftConfiguration;
+
+ private volatile boolean isActiveActor;
+
+ private volatile long lastTermSeen = -1;
+
/**
* The constructor.
*
- * @param metaStorageMgr Meta Storage manager.
+ * @param replicationGroupId Id of placement driver group.
+ * @param clusterService Cluster service.
+ * @param raftConfiguration Raft configuration.
+ * @param placementDriverNodesNamesProvider Provider of the set of
placement driver nodes' names.
+ * @param logicalTopologyService Logical topology service.
+ * @param raftClientExecutor Raft client executor.
*/
- public PlacementDriverManager(MetaStorageManager metaStorageMgr) {
+ public PlacementDriverManager(
+ ReplicationGroupId replicationGroupId,
+ ClusterService clusterService,
+ RaftConfiguration raftConfiguration,
+ Supplier<CompletableFuture<Set<String>>>
placementDriverNodesNamesProvider,
+ LogicalTopologyService logicalTopologyService,
+ ScheduledExecutorService raftClientExecutor
+ ) {
+ this.replicationGroupId = replicationGroupId;
+ this.clusterService = clusterService;
+ this.raftConfiguration = raftConfiguration;
+ this.placementDriverNodesNamesProvider =
placementDriverNodesNamesProvider;
+ this.logicalTopologyService = logicalTopologyService;
+ this.raftClientExecutor = raftClientExecutor;
+
+ raftClientFuture = new CompletableFuture<>();
}
/** {@inheritDoc} */
@Override
public void start() {
+ placementDriverNodesNamesProvider.get()
+ .thenCompose(placementDriverNodes -> {
+ String thisNodeName =
clusterService.topologyService().localMember().name();
+
+ if (placementDriverNodes.contains(thisNodeName)) {
+ return TopologyAwareRaftGroupService.start(
+ replicationGroupId,
+ clusterService,
+ raftMessagesFactory,
+ raftConfiguration,
+
PeersAndLearners.fromConsistentIds(placementDriverNodes),
+ true,
+ raftClientExecutor,
+ logicalTopologyService,
+ true
+ ).thenCompose(client -> {
+ TopologyAwareRaftGroupService
topologyAwareClient = (TopologyAwareRaftGroupService) client;
+ return
topologyAwareClient.subscribeLeader(this::onLeaderChange).thenApply(v ->
topologyAwareClient);
Review Comment:
If we subscribe to the leader elected event, where will we unsubscribe?
I think it should be before stop (IgniteComponent#beforeNodeStop).
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java:
##########
@@ -418,10 +431,8 @@ private static class ServerEventHandler {
* @param term Term.
*/
private synchronized void onLeaderElected(ClusterNode node, long term)
{
- if (onLeaderElectedCallback != null && term > this.term) {
Review Comment:
Why does the expression (term > this.term) is removed?
There is no sense to notify several times about one term.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]