SammyVimes commented on code in PR #1016:
URL: https://github.com/apache/ignite-3/pull/1016#discussion_r957359315
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -552,33 +618,69 @@ private void
updateAssignmentInternal(ConfigurationNotificationEvent<byte[]> ass
if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
startGroupFut = CompletableFuture
- .supplyAsync(
- () ->
internalTbl.storage().getOrCreateMvPartition(partId), ioExecutor)
+ .supplyAsync(() ->
internalTbl.storage().getOrCreateMvPartition(partId), ioExecutor)
.thenComposeAsync((partitionStorage) -> {
- RaftGroupOptions groupOptions =
groupOptionsForPartition(internalTbl, tblCfg, partitionStorage,
- newPartAssignment);
-
- try {
- raftMgr.startRaftGroupNode(
- grpId,
- newPartAssignment,
- new PartitionListener(tblId, new
VersionedRowStore(partitionStorage, txManager)),
- new
RebalanceRaftGroupEventsListener(
- metaStorageMgr,
-
tablesCfg.tables().get(tablesById.get(tblId).name()),
- grpId,
- partId,
- busyLock,
- movePartition(() ->
internalTbl.partitionRaftGroupService(partId)),
- rebalanceScheduler
- ),
- groupOptions
- );
-
- return
CompletableFuture.completedFuture(null);
- } catch (NodeStoppingException ex) {
- return CompletableFuture.failedFuture(ex);
+ boolean hasData =
partitionStorage.lastAppliedIndex() > 0;
+
+ CompletableFuture<Boolean> fut;
+
+ if (isInMemory || !hasData) {
+ List<ClusterNode> partAssignments =
assignmentsLatest.get(partId);
+
+ fut = queryDataNodesCount(tblId, partId,
partAssignments).thenApply(dataNodesCount -> {
+ boolean fullPartitionRestart =
dataNodesCount == 0;
+ boolean majorityAvailable =
dataNodesCount >= (partAssignments.size() / 2) + 1;
+
+ if (fullPartitionRestart) {
+ return true;
+ }
+
+ if (majorityAvailable) {
+ String partitionId =
partitionRaftGroupName(tblId, partId);
+
RebalanceUtil.startPeerRemoval(partitionId, localMember, metaStorageMgr);
+ return false;
+ } else {
+ // No majority and not a full
partition restart - need to restart nodes
+ // with current partition.
+ String msg = "Unable to start
partition " + partId + ". Majority not available.";
+ throw new
IgniteInternalException(msg);
+ }
+ });
+ } else {
+ fut =
CompletableFuture.completedFuture(true);
}
+
+ return fut.thenComposeAsync(startGroup -> {
Review Comment:
Well in this case fut.thenCompose will be running in network thread, because
`queryDataNodesCount` completes on the network thread
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]