sergeyuttsel commented on code in PR #2095:
URL: https://github.com/apache/ignite-3/pull/2095#discussion_r1277500635
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -705,219 +705,230 @@ private CompletableFuture<?>
onTableDelete(ConfigurationNotificationEvent<TableV
*/
private CompletableFuture<?> createTablePartitionsLocally(
long causalityToken,
- List<Set<Assignment>> assignments,
+ CompletableFuture<List<Set<Assignment>>> assignments,
int zoneId,
TableImpl table
) {
int tableId = table.tableId();
- List<Set<Assignment>> newAssignments = assignments;
+ // Create new raft nodes according to new assignments.
+ Supplier<CompletableFuture<Void>> updateAssignmentsClosure = () -> {
+ return assignments.thenCompose(newAssignments -> {
+ // Empty assignments might be a valid case if tables are
created from within cluster init HOCON
+ // configuration, which is not supported now.
+ assert newAssignments != null :
IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId);
+
+ int partitions = newAssignments.size();
- // Empty assignments might be a valid case if tables are created from
within cluster init HOCON
- // configuration, which is not supported now.
- assert newAssignments != null : IgniteStringFormatter.format("Table
[id={}] has empty assignments.", tableId);
+ CompletableFuture<?>[] futures = new
CompletableFuture<?>[partitions];
- int partitions = newAssignments.size();
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19713
Process assignments and set partitions only for assigned partitions.
+ PartitionSet parts = new BitSetPartitionSet();
- CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];
+ for (int i = 0; i < futures.length; i++) {
+ futures[i] = new CompletableFuture<>();
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19713 Process
assignments and set partitions only for assigned partitions.
- PartitionSet parts = new BitSetPartitionSet();
+ parts.set(i);
+ }
- for (int i = 0; i < futures.length; i++) {
- futures[i] = new CompletableFuture<>();
+ String localMemberName = localNode().name();
- parts.set(i);
- }
+ for (int i = 0; i < partitions; i++) {
+ int partId = i;
- String localMemberName = localNode().name();
+ Set<Assignment> newPartAssignment =
newAssignments.get(partId);
- // Create new raft nodes according to new assignments.
- Supplier<CompletableFuture<Void>> updateAssignmentsClosure = () -> {
- for (int i = 0; i < partitions; i++) {
- int partId = i;
+ InternalTable internalTbl = table.internalTable();
- Set<Assignment> newPartAssignment = newAssignments.get(partId);
+ Assignment localMemberAssignment =
newPartAssignment.stream()
+ .filter(a ->
a.consistentId().equals(localMemberName))
+ .findAny()
+ .orElse(null);
- InternalTable internalTbl = table.internalTable();
+ PeersAndLearners newConfiguration =
configurationFromAssignments(newPartAssignment);
- Assignment localMemberAssignment = newPartAssignment.stream()
- .filter(a -> a.consistentId().equals(localMemberName))
- .findAny()
- .orElse(null);
+ TablePartitionId replicaGrpId = new
TablePartitionId(tableId, partId);
- PeersAndLearners newConfiguration =
configurationFromAssignments(newPartAssignment);
+ placementDriver.updateAssignment(replicaGrpId,
newConfiguration.peers().stream().map(Peer::consistentId)
+ .collect(toList()));
- TablePartitionId replicaGrpId = new TablePartitionId(tableId,
partId);
+ var safeTimeTracker = new
PendingComparableValuesTracker<HybridTimestamp, Void>(
+ new HybridTimestamp(1, 0)
+ );
+ var storageIndexTracker = new
PendingComparableValuesTracker<Long, Void>(0L);
- placementDriver.updateAssignment(replicaGrpId,
newConfiguration.peers().stream().map(Peer::consistentId)
- .collect(toList()));
+ ((InternalTableImpl)
internalTbl).updatePartitionTrackers(partId, safeTimeTracker,
storageIndexTracker);
- var safeTimeTracker = new
PendingComparableValuesTracker<HybridTimestamp, Void>(
- new HybridTimestamp(1, 0)
- );
- var storageIndexTracker = new
PendingComparableValuesTracker<Long, Void>(0L);
+ PartitionStorages partitionStorages =
getPartitionStorages(table, partId);
- ((InternalTableImpl)
internalTbl).updatePartitionTrackers(partId, safeTimeTracker,
storageIndexTracker);
+ PartitionDataStorage partitionDataStorage =
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
+ internalTbl, partId);
- PartitionStorages partitionStorages =
getPartitionStorages(table, partId);
+ PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
+ partId,
+ partitionDataStorage,
+ table,
+ safeTimeTracker
+ );
- PartitionDataStorage partitionDataStorage =
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
- internalTbl, partId);
+ mvGc.addStorage(replicaGrpId,
partitionUpdateHandlers.gcUpdateHandler);
- PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
- partId,
- partitionDataStorage,
- table,
- safeTimeTracker
- );
+ CompletableFuture<Boolean> startGroupFut;
- mvGc.addStorage(replicaGrpId,
partitionUpdateHandlers.gcUpdateHandler);
+ // start new nodes, only if it is table creation, other
cases will be covered by rebalance logic
+ if (localMemberAssignment != null) {
+ CompletableFuture<Boolean> shouldStartGroupFut;
- CompletableFuture<Boolean> startGroupFut;
+ // If Raft is running in in-memory mode or the PDS has
been cleared, we need to remove the current node
+ // from the Raft group in order to avoid the double
vote problem.
+ // <MUTED> See
https://issues.apache.org/jira/browse/IGNITE-16668 for details.
+ // TODO:
https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData"
+ if (internalTbl.storage().isVolatile()) {
+ shouldStartGroupFut = queryDataNodesCount(tableId,
partId, newConfiguration.peers())
+ .thenApply(dataNodesCount -> {
+ boolean fullPartitionRestart =
dataNodesCount == 0;
- // start new nodes, only if it is table creation, other cases
will be covered by rebalance logic
- if (localMemberAssignment != null) {
- CompletableFuture<Boolean> shouldStartGroupFut;
+ if (fullPartitionRestart) {
+ return true;
+ }
- // If Raft is running in in-memory mode or the PDS has
been cleared, we need to remove the current node
- // from the Raft group in order to avoid the double vote
problem.
- // <MUTED> See
https://issues.apache.org/jira/browse/IGNITE-16668 for details.
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData"
- if (internalTbl.storage().isVolatile()) {
- shouldStartGroupFut = queryDataNodesCount(tableId,
partId, newConfiguration.peers()).thenApply(dataNodesCount -> {
- boolean fullPartitionRestart = dataNodesCount == 0;
+ boolean majorityAvailable =
dataNodesCount >= (newConfiguration.peers().size() / 2) + 1;
- if (fullPartitionRestart) {
- return true;
- }
+ if (majorityAvailable) {
+
RebalanceUtil.startPeerRemoval(replicaGrpId, localMemberAssignment,
metaStorageMgr);
- boolean majorityAvailable = dataNodesCount >=
(newConfiguration.peers().size() / 2) + 1;
+ return false;
+ } else {
+ // No majority and not a full
partition restart - need to restart nodes
+ // with current partition.
+ String msg = "Unable to start
partition " + partId + ". Majority not available.";
- if (majorityAvailable) {
- RebalanceUtil.startPeerRemoval(replicaGrpId,
localMemberAssignment, metaStorageMgr);
+ throw new
IgniteInternalException(msg);
+ }
+ });
+ } else {
+ shouldStartGroupFut = completedFuture(true);
+ }
+ startGroupFut =
shouldStartGroupFut.thenApplyAsync(startGroup -> inBusyLock(busyLock, () -> {
+ if (!startGroup) {
return false;
- } else {
- // No majority and not a full partition
restart - need to restart nodes
- // with current partition.
- String msg = "Unable to start partition " +
partId + ". Majority not available.";
+ }
+ TxStateStorage txStatePartitionStorage =
partitionStorages.getTxStateStorage();
+
+ RaftGroupOptions groupOptions =
groupOptionsForPartition(
+ internalTbl.storage(),
+ internalTbl.txStateStorage(),
+ partitionKey(internalTbl, partId),
+ partitionUpdateHandlers
+ );
+
+ Peer serverPeer =
newConfiguration.peer(localMemberName);
+
+ var raftNodeId = new RaftNodeId(replicaGrpId,
serverPeer);
+
+ try {
+ // TODO: use RaftManager interface, see
https://issues.apache.org/jira/browse/IGNITE-18273
+ ((Loza) raftMgr).startRaftGroupNode(
+ raftNodeId,
+ newConfiguration,
+ new PartitionListener(
+ partitionDataStorage,
+
partitionUpdateHandlers.storageUpdateHandler,
+ txStatePartitionStorage,
+ safeTimeTracker,
+ storageIndexTracker
+ ),
+ new RebalanceRaftGroupEventsListener(
+ metaStorageMgr,
+ replicaGrpId,
+ busyLock,
+
createPartitionMover(internalTbl, partId),
+ this::calculateAssignments,
+ rebalanceScheduler
+ ),
+ groupOptions
+ );
- throw new IgniteInternalException(msg);
+ return true;
+ } catch (NodeStoppingException ex) {
+ throw new CompletionException(ex);
}
- });
+ }), ioExecutor);
} else {
- shouldStartGroupFut = completedFuture(true);
+ startGroupFut = completedFuture(false);
}
- startGroupFut =
shouldStartGroupFut.thenApplyAsync(startGroup -> inBusyLock(busyLock, () -> {
- if (!startGroup) {
- return false;
- }
- TxStateStorage txStatePartitionStorage =
partitionStorages.getTxStateStorage();
-
- RaftGroupOptions groupOptions =
groupOptionsForPartition(
- internalTbl.storage(),
- internalTbl.txStateStorage(),
- partitionKey(internalTbl, partId),
- partitionUpdateHandlers
- );
+ startGroupFut
+ .thenComposeAsync(v -> inBusyLock(busyLock, () -> {
+ try {
+ //TODO IGNITE-19614 This procedure takes
10 seconds if there's no majority online.
+ return
raftMgr.startRaftGroupService(replicaGrpId, newConfiguration,
raftGroupServiceFactory);
+ } catch (NodeStoppingException ex) {
+ return failedFuture(ex);
+ }
+ }), ioExecutor)
+ .thenAcceptAsync(updatedRaftGroupService ->
inBusyLock(busyLock, () -> {
+ ((InternalTableImpl)
internalTbl).updateInternalTableRaftGroupService(partId,
updatedRaftGroupService);
- Peer serverPeer =
newConfiguration.peer(localMemberName);
+ boolean startedRaftNode = startGroupFut.join();
+ if (localMemberAssignment == null ||
!startedRaftNode) {
+ return;
+ }
- var raftNodeId = new RaftNodeId(replicaGrpId,
serverPeer);
+ MvPartitionStorage partitionStorage =
partitionStorages.getMvPartitionStorage();
+ TxStateStorage txStateStorage =
partitionStorages.getTxStateStorage();
- try {
- // TODO: use RaftManager interface, see
https://issues.apache.org/jira/browse/IGNITE-18273
- ((Loza) raftMgr).startRaftGroupNode(
- raftNodeId,
- newConfiguration,
- new PartitionListener(
- partitionDataStorage,
-
partitionUpdateHandlers.storageUpdateHandler,
- txStatePartitionStorage,
- safeTimeTracker,
- storageIndexTracker
- ),
- new RebalanceRaftGroupEventsListener(
- metaStorageMgr,
+ try {
+ startReplicaWithNewListener(
replicaGrpId,
- busyLock,
- createPartitionMover(internalTbl,
partId),
- this::calculateAssignments,
- rebalanceScheduler
- ),
- groupOptions
- );
+ table,
+ safeTimeTracker,
+ storageIndexTracker,
+ partitionStorage,
+ txStateStorage,
+ partitionUpdateHandlers,
+ updatedRaftGroupService,
+
schemaManager.schemaRegistry(causalityToken, tableId)
+ );
+ } catch (NodeStoppingException ex) {
+ throw new AssertionError("Loza was stopped
before Table manager", ex);
+ }
+ }), ioExecutor)
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.warn("Unable to update raft groups on
the node [tableId={}, partitionId={}]", ex, tableId, partId);
- return true;
- } catch (NodeStoppingException ex) {
- throw new CompletionException(ex);
- }
- }), ioExecutor);
- } else {
- startGroupFut = completedFuture(false);
+ futures[partId].completeExceptionally(ex);
+ } else {
+ futures[partId].complete(null);
+ }
+ });
}
- startGroupFut
- .thenComposeAsync(v -> inBusyLock(busyLock, () -> {
- try {
- //TODO IGNITE-19614 This procedure takes 10
seconds if there's no majority online.
- return
raftMgr.startRaftGroupService(replicaGrpId, newConfiguration,
raftGroupServiceFactory);
- } catch (NodeStoppingException ex) {
- return failedFuture(ex);
- }
- }), ioExecutor)
- .thenAcceptAsync(updatedRaftGroupService ->
inBusyLock(busyLock, () -> {
- ((InternalTableImpl)
internalTbl).updateInternalTableRaftGroupService(partId,
updatedRaftGroupService);
-
- boolean startedRaftNode = startGroupFut.join();
- if (localMemberAssignment == null ||
!startedRaftNode) {
- return;
- }
+ return allOf(futures);
+ });
- MvPartitionStorage partitionStorage =
partitionStorages.getMvPartitionStorage();
- TxStateStorage txStateStorage =
partitionStorages.getTxStateStorage();
- try {
- startReplicaWithNewListener(
- replicaGrpId,
- table,
- safeTimeTracker,
- storageIndexTracker,
- partitionStorage,
- txStateStorage,
- partitionUpdateHandlers,
- updatedRaftGroupService,
-
schemaManager.schemaRegistry(causalityToken, tableId)
- );
- } catch (NodeStoppingException ex) {
- throw new AssertionError("Loza was stopped
before Table manager", ex);
- }
- }), ioExecutor)
- .whenComplete((res, ex) -> {
- if (ex != null) {
- LOG.warn("Unable to update raft groups on the
node [tableId={}, partitionId={}]", ex, tableId, partId);
-
- futures[partId].completeExceptionally(ex);
- } else {
- futures[partId].complete(null);
- }
- });
- }
-
- return allOf(futures);
};
// NB: all vv.update() calls must be made from the synchronous part of
the method (not in thenCompose()/etc!).
CompletableFuture<?> localPartsUpdateFuture =
localPartsByTableIdVv.update(causalityToken,
(previous, throwable) -> inBusyLock(busyLock, () -> {
- return getOrCreatePartitionStorages(table,
parts).thenApply(u -> {
- var newValue = new HashMap<>(previous);
+ return assignments.thenCompose(newAssignments -> {
Review Comment:
I agree. Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]