tkalkirill commented on code in PR #1615:
URL: https://github.com/apache/ignite-3/pull/1615#discussion_r1094548315
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2102,4 +2055,60 @@ private CompletableFuture<PartitionStorages>
getOrCreatePartitionStorages(TableI
}, ioExecutor)
.thenCompose(Function.identity());
}
+
+ /**
+ * Handles the {@link RebalanceUtil#STABLE_ASSIGNMENTS_PREFIX} update
event.
+ *
+ * @param evt Event.
+ */
+ protected void handleChangeStableAssignmentEvent(WatchEvent evt) {
+ inBusyLock(busyLock, () -> {
+ assert evt.single() : evt;
+
+ Entry stableAssignmentsWatchEvent = evt.entryEvent().newEntry();
+
+ if (stableAssignmentsWatchEvent.value() == null) {
+ return;
+ }
+
+ int partitionId =
extractPartitionNumber(stableAssignmentsWatchEvent.key());
+ UUID tableId = extractTableId(stableAssignmentsWatchEvent.key(),
STABLE_ASSIGNMENTS_PREFIX);
+
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partitionId);
+
+ Set<Assignment> stableAssignments =
ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
+
+ byte[] pendingAssignmentsFromMetaStorage = metaStorageMgr.get(
+ pendingPartAssignmentsKey(tablePartitionId),
+ stableAssignmentsWatchEvent.revision()
+ ).join().value();
+
+ Set<Assignment> pendingAssignments =
pendingAssignmentsFromMetaStorage == null
+ ? Set.of()
+ : ByteUtils.fromBytes(pendingAssignmentsFromMetaStorage);
+
+ String localMemberName =
clusterService.topologyService().localMember().name();
+
+ boolean shouldStopLocalServices =
Stream.concat(stableAssignments.stream(), pendingAssignments.stream())
+ .noneMatch(assignment ->
assignment.consistentId().equals(localMemberName));
+
+ if (shouldStopLocalServices) {
+ try {
+ raftMgr.stopRaftNodes(tablePartitionId);
+
+ replicaMgr.stopReplica(tablePartitionId);
+ } catch (NodeStoppingException e) {
+ // no-op
+ }
+
+ InternalTable internalTable =
tablesByIdVv.latest().get(tableId).internalTable();
+
+ // Should be done fairly quickly.
+ allOf(
+ internalTable.storage().destroyPartition(partitionId),
+ runAsync(() ->
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
+ ).join();
Review Comment:
Added TODO
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]