ibessonov commented on code in PR #1016:
URL: https://github.com/apache/ignite-3/pull/1016#discussion_r958109473
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -552,33 +618,69 @@ private void
updateAssignmentInternal(ConfigurationNotificationEvent<byte[]> ass
if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
startGroupFut = CompletableFuture
- .supplyAsync(
- () ->
internalTbl.storage().getOrCreateMvPartition(partId), ioExecutor)
+ .supplyAsync(() ->
internalTbl.storage().getOrCreateMvPartition(partId), ioExecutor)
.thenComposeAsync((partitionStorage) -> {
- RaftGroupOptions groupOptions =
groupOptionsForPartition(internalTbl, tblCfg, partitionStorage,
- newPartAssignment);
-
- try {
- raftMgr.startRaftGroupNode(
- grpId,
- newPartAssignment,
- new PartitionListener(tblId, new
VersionedRowStore(partitionStorage, txManager)),
- new
RebalanceRaftGroupEventsListener(
- metaStorageMgr,
-
tablesCfg.tables().get(tablesById.get(tblId).name()),
- grpId,
- partId,
- busyLock,
- movePartition(() ->
internalTbl.partitionRaftGroupService(partId)),
- rebalanceScheduler
- ),
- groupOptions
- );
-
- return
CompletableFuture.completedFuture(null);
- } catch (NodeStoppingException ex) {
- return CompletableFuture.failedFuture(ex);
+ boolean hasData =
partitionStorage.lastAppliedIndex() > 0;
+
+ CompletableFuture<Boolean> fut;
+
+ if (isInMemory || !hasData) {
+ List<ClusterNode> partAssignments =
assignmentsLatest.get(partId);
+
+ fut = queryDataNodesCount(tblId, partId,
partAssignments).thenApply(dataNodesCount -> {
+ boolean fullPartitionRestart =
dataNodesCount == 0;
+ boolean majorityAvailable =
dataNodesCount >= (partAssignments.size() / 2) + 1;
+
+ if (fullPartitionRestart) {
+ return true;
+ }
+
+ if (majorityAvailable) {
+ String partitionId =
partitionRaftGroupName(tblId, partId);
+
RebalanceUtil.startPeerRemoval(partitionId, localMember, metaStorageMgr);
+ return false;
+ } else {
+ // No majority and not a full
partition restart - need to restart nodes
+ // with current partition.
+ String msg = "Unable to start
partition " + partId + ". Majority not available.";
+ throw new
IgniteInternalException(msg);
+ }
+ });
+ } else {
+ fut =
CompletableFuture.completedFuture(true);
}
+
+ return fut.thenComposeAsync(startGroup -> {
Review Comment:
Can we use "composeAsync" or something?
Is there a way to add assertions that we are not in the network thread?
Would be handy I think
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -606,6 +708,32 @@ private void
updateAssignmentInternal(ConfigurationNotificationEvent<byte[]> ass
CompletableFuture.allOf(futures).join();
}
+ /**
+ * Calculates the quantity of the data nodes for the partition of the
table.
+ *
+ * @param tblId Table id.
+ * @param partId Partition id.
+ * @param partAssignments Partition assignments.
+ * @return A future that will hold the quantity of data nodes.
+ */
+ private CompletableFuture<Long> queryDataNodesCount(UUID tblId, int
partId, List<ClusterNode> partAssignments) {
Review Comment:
Ok, never mind then
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]