alex-plekhanov commented on a change in pull request #9321:
URL: https://github.com/apache/ignite/pull/9321#discussion_r728130429
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -316,6 +371,9 @@
/** Snapshot operation handlers. */
private final SnapshotHandlers handlers = new SnapshotHandlers();
+ /** Manager to handle remote snapshot requests and receive. */
Review comment:
receive response?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
##########
@@ -987,14 +1044,7 @@ public void readCacheConfigurations(File dir, Map<String,
StoredCacheData> ccfgs
if (conf.exists() && conf.length() > 0) {
StoredCacheData cacheData = readCacheData(conf);
- String cacheName = cacheData.config().getName();
-
- if (!ccfgs.containsKey(cacheName))
- ccfgs.put(cacheName, cacheData);
- else {
- U.warn(log, "Cache with name=" + cacheName + " is already
registered, skipping config file "
- + dir.getName());
- }
+ ccfgs.putIfAbsent(cacheData.config().getName(),
readCacheData(conf));
Review comment:
Why do we call readCacheData again? We already have cacheData.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -947,11 +1103,611 @@ private Exception checkNodeLeft(Set<UUID> reqNodes,
Set<UUID> respNodes) {
* @param res Results.
* @param errs Errors.
*/
- private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID,
Exception> errs) {
+ private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID,
Exception> errs) {
if (ctx.clientNode())
return;
- if (!errs.isEmpty()) {
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Exception failure = errs.values().stream().findFirst().
+ orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+ opCtx0.errHnd.accept(failure);
+
+ if (failure != null) {
+ opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ rollbackRestoreProc.start(reqId, reqId);
+
+ return;
+ }
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ cacheStartProc.start(reqId, reqId);
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @return Result future.
+ */
+ private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
+ if (ctx.clientNode())
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Throwable err = opCtx0.err.get();
+
+ if (err != null)
+ return new GridFinishedFuture<>(err);
+
+ if (!U.isLocalNodeCoordinator(ctx.discovery()))
+ return opCtx0.cachesLoadFut;
+
+ Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting restored caches " +
+ "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
+ ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName())
+ ']');
+ }
+
+ // We set the topology node IDs required to successfully start the
cache, if any of the required nodes leave
+ // the cluster during the cache startup, the whole procedure will be
rolled back.
+ GridCompoundFuture<Boolean, Boolean> awaitBoth = new
GridCompoundFuture<>();
+
+ IgniteInternalFuture<Boolean> cacheStartFut =
ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true,
+ true, !opCtx0.sameTop, IgniteUuid.fromUuid(reqId));
+
+ // This is required for the rollback procedure to execute the cache
groups stop operation.
+ cacheStartFut.listen(f -> opCtx0.isLocNodeStartedCaches = (f.error()
== null));
+
+ // Convert exception to the RestoreCacheStartException to propagate to
other nodes over the distributed process.
+ awaitBoth.add(chainCacheStartException(cacheStartFut));
+ awaitBoth.add(opCtx0.cachesLoadFut);
+
+ awaitBoth.markInitialized();
+
+ return awaitBoth;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture exchFut) {
+ if (ctx.clientNode())
+ return;
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+ Set<Integer> grpIdsToStart = getCachesLoadingFromSnapshot(exchFut,
opCtx0);
+
+ if (F.isEmpty(grpIdsToStart))
+ return;
+
+ assert opCtx0 != null;
+ assert !opCtx0.sameTop : "WAL must be disabled only for caches
restoring from snapshot taken on another cluster: " + opCtx0;
+
+ // This is happened on the exchange which has been initiated by a
dynamic cache start message and intend to switch
+ // off the WAL for cache groups loading from a snapshot.
+ for (CacheGroupContext grp : F.view(ctx.cache().cacheGroups(), g ->
grpIdsToStart.contains(g.groupId()))) {
+ assert grp.localWalEnabled() : grp.cacheOrGroupName();
+
+ // Check partitions have not even been created yet, so the
PartitionMetaStateRecord won't be logged to the WAL.
+ for (int p = 0; p < grp.topology().partitions(); p++)
+ assert grp.topology().localPartition(p) == null : p;
+
+ grp.localWalEnabled(false, true);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ // This will be called after the processCacheStopRequestOnExchangeDone
happens.
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (ctx.clientNode() || fut == null || fut.exchangeActions() == null
|| opCtx0 == null)
+ return;
+
+ Set<String> grpNamesToStop =
fut.exchangeActions().cacheGroupsToStop((ccfg, uuid) ->
+ requirePartitionLoad(ccfg, uuid, opCtx0))
+ .stream()
+ .map(g -> g.descriptor().cacheOrGroupName())
+ .collect(Collectors.toSet());
+
+ if (F.isEmpty(grpNamesToStop))
+ return;
+
+ assert grpNamesToStop.size() == opCtx0.dirs.size() : grpNamesToStop;
+
+ opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @param res Results.
+ * @param errs Errors.
+ */
+ private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res,
Map<UUID, Exception> errs) {
+ if (ctx.clientNode())
+ return;
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Exception failure = errs.values().stream().findFirst().
+ orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+ if (failure == null) {
+ if (opCtx0.sameTop) {
+ finishProcess(reqId);
+
+ updateMetastorageRecoveryKeys(opCtx0.dirs, true);
+ }
+ else
+ lateAffProc.start(reqId, reqId);
+
+ return;
+ }
+
+ opCtx0.errHnd.accept(failure);
+
+ ClusterNode crd = U.oldest(ctx.discovery().aliveServerNodes(), null);
+
+ // Caches were not even been started and rollback already occurred
during PME, so they are not even stared.
+ if (X.hasCause(errs.get(crd.id()), RestoreCacheStartException.class))
+ opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ rollbackRestoreProc.start(reqId, reqId);
+ }
+
+ /**
+ * @param grps Ordered list of cache groups sorted by priority.
+ * @param exchFut Exchange future.
+ */
+ public void onRebalanceReady(Set<CacheGroupContext> grps, @Nullable
GridDhtPartitionsExchangeFuture exchFut) {
+ if (ctx.clientNode())
+ return;
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Set<Integer> exchGrpIds = getCachesLoadingFromSnapshot(exchFut,
opCtx0);
+
+ Set<CacheGroupContext> filtered = grps.stream()
+ .filter(Objects::nonNull)
+ .filter(g -> exchGrpIds.contains(g.groupId()))
+ .collect(Collectors.toSet());
+
+ // Restore requests has been already processed at previous exchange.
+ if (filtered.isEmpty())
+ return;
+
+ assert opCtx0 != null;
+
+ // First preload everything from the local node.
+ List<SnapshotMetadata> locMetas =
opCtx0.metasPerNode.get(ctx.localNodeId());
+
+ Map<Integer, Set<Integer>> notScheduled = new HashMap<>();
+
+ // Register partitions to be processed.
+ for (CacheGroupContext grp : filtered) {
+ if (F.isEmpty(locMetas))
+ break;
+
+ notScheduled.put(grp.groupId(),
+ affinityPartitions(grp.affinity(),
ctx.cache().context().localNode(), Integer::new));
+
+ Set<Integer> leftParts = notScheduled.get(grp.groupId());
+
+ assert !leftParts.contains(INDEX_PARTITION);
+
+ if (F.isEmpty(leftParts)) {
+ opCtx0.cachesLoadFut.onDone();
+
+ continue;
+ }
+
+ Set<PartitionRestoreLifecycleFuture> partLfs =
U.newHashSet(leftParts.size());
+
+ for (Integer partId : leftParts) {
+ // Affinity node partitions are inited on exchange.
+ GridDhtLocalPartition part =
grp.topology().localPartition(partId);
+ PartitionRestoreLifecycleFuture lf =
PartitionRestoreLifecycleFuture.create(grp, log, partId);
+
+ // Start partition eviction first.
+ if (part == null)
+ lf.cleared.complete(null);
+ else {
+ part.clearAsync().listen(f -> {
+ // This future must clear all heap cache entries too
from the GridDhtLocalPartition map.
+ if (f.error() == null)
+ lf.cleared.complete(f.result());
Review comment:
I don't like CompletableFuture overusing especially in simple cases like
this (but also it brings nothing then complexity in whole patch). Ignite future
will be simple chained in this case, but for a CompletableFuture you should
write a lot of code which is much harder to read.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2099,6 +2272,590 @@ private boolean readPageFromStore(long pageId,
ByteBuffer buff) throws IgniteChe
}
}
+ /** Remote snapshot future which tracks remote snapshot transmission
result. */
+ private static class RemoteSnapshotFutureTask extends
GridFutureAdapter<Void> implements Runnable {
Review comment:
It's not a good class name, we already have AbstractSnapshotFutureTask
and SnapshotFutureTask, but RemoteSnapshotFutureTask is not related to these
classes anyhow.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1576,39 +1714,40 @@ SnapshotFutureTask registerSnapshotTask(
boolean withMetaStorage,
SnapshotSender snpSndr
) {
+ return (SnapshotFutureTask)registerSnapshotTask(snpName, new
SnapshotFutureTask(cctx, srcNodeId, snpName, tmpWorkDir,
+ ioFactory, snpSndr, parts, withMetaStorage, locBuff));
+ }
+
+ /**
+ * @param task Snapshot operation task to be executed.
+ * @return Snapshot operation task which should be registered on
checkpoint to run.
+ */
+ private AbstractSnapshotFutureTask<?> registerSnapshotTask(String rqId,
AbstractSnapshotFutureTask<?> task) {
if (!busyLock.enterBusy()) {
- return new SnapshotFutureTask(
- new IgniteCheckedException("Snapshot manager is stopping
[locNodeId=" + cctx.localNodeId() + ']'));
+ return new SnapshotFinishedFutureTask(new
IgniteCheckedException("Snapshot manager is stopping [locNodeId=" +
+ cctx.localNodeId() + ']'));
}
try {
- if (locSnpTasks.containsKey(snpName))
- return new SnapshotFutureTask(new
IgniteCheckedException("Snapshot with requested name is already scheduled: " +
snpName));
+ if (locSnpTasks.containsKey(rqId))
+ return new SnapshotFinishedFutureTask(new
IgniteCheckedException("Snapshot with requested name is already scheduled: " +
Review comment:
{}
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -280,6 +332,9 @@
/** Local snapshot sender factory. */
private Function<String, SnapshotSender> locSndrFactory =
LocalSnapshotSender::new;
+ /** Local snapshot sender factory. */
Review comment:
Local -> Remote?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1629,6 +1768,26 @@ void localSnapshotSenderFactory(Function<String,
SnapshotSender> factory) {
return locSndrFactory;
}
+ /**
+ * @param factory Factory which produces {@link RemoteSnapshotSender}
implementation.
+ */
+ void remoteSnapshotSenderFactory(BiFunction<String, UUID, SnapshotSender>
factory) {
+ rmtSndrFactory = factory;
+ }
+
+ /**
+ * @param rqId Request id.
+ * @param nodeId Node id.
+ * @return Snapshot sender related to given node id.
+ */
+ RemoteSnapshotSender remoteSnapshotSenderFactory(String rqId, UUID nodeId)
{
+ return new RemoteSnapshotSender(log,
+ snpRunner,
+ () -> databaseRelativePath(pdsSettings.folderName()),
Review comment:
Why can't we use the path directly? Without supplier.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2099,6 +2272,590 @@ private boolean readPageFromStore(long pageId,
ByteBuffer buff) throws IgniteChe
}
}
+ /** Remote snapshot future which tracks remote snapshot transmission
result. */
+ private static class RemoteSnapshotFutureTask extends
GridFutureAdapter<Void> implements Runnable {
+ /** Snapshot name to create. */
+ private final String rqId = RMT_SNAPSHOT_PREFIX +
U.maskForFileName(UUID.randomUUID().toString());
+
+ /** Remote node id to request snapshot from. */
+ private final UUID rmtNodeId;
+
+ /** Initialization procedure. */
+ private final IgniteThrowableConsumer<String> init;
+
+ /** Process interrupt checker. */
+ private final BooleanSupplier stopChecker;
+
+ /** Partition handler given by request initiator. */
+ private final BiConsumer<File, Throwable> partHnd;
+
+ /** Temporary working directory for consuming partitions. */
+ private final Path dir;
+
+ /** Counter which show how many partitions left to be received. */
+ private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+ /**
+ * @param rmtNodeId Remote node id to request snapshots.
+ * @param tmpWorkDir Temporary work directory.
+ * @param init The initialization task procedure.
+ * @param stopChecker Process interrupt checker.
+ * @param partHnd Partition handler.
+ */
+ public RemoteSnapshotFutureTask(
+ UUID rmtNodeId,
+ File tmpWorkDir,
+ IgniteThrowableConsumer<String> init,
+ BooleanSupplier stopChecker,
+ BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+ ) {
+ dir = Paths.get(tmpWorkDir.getAbsolutePath(), rqId);
+ this.rmtNodeId = rmtNodeId;
+ this.init = init;
+ this.stopChecker = stopChecker;
+ this.partHnd = partHnd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void run() {
+ if (isDone())
+ return;
+
+ try {
+ init.accept(rqId);
+ }
+ catch (Throwable t) {
+ onDone(t);
+ }
+ }
+
+ /**
+ * @param ex Exception occurred during receiving files.
+ */
+ public synchronized void acceptException(Throwable ex) {
+ if (isDone())
+ return;
+
+ try {
+ partHnd.accept(null, ex);
+ }
+ catch (Throwable t) {
+ ex.addSuppressed(t);
+ }
+
+ onDone(ex);
+ }
+
+ /**
+ * @param part Received file which needs to be handled.
+ */
+ public synchronized void acceptFile(File part) {
+ if (isDone())
+ return;
+
+ if (stopChecker.getAsBoolean())
+ throw new TransmissionCancelledException("Future cancelled
prior to the all requested partitions processed.");
+
+ partHnd.accept(part, null);
+ partsLeft.decrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected synchronized boolean onDone(@Nullable Void res,
@Nullable Throwable err, boolean cancel) {
+ U.delete(dir);
+
+ return super.onDone(res, err, cancel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ RemoteSnapshotFutureTask future = (RemoteSnapshotFutureTask)o;
+
+ return Objects.equals(rqId, future.rqId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(rqId);
Review comment:
`rqId.hash()` (don't create array and iterator implicitly)
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2099,6 +2272,590 @@ private boolean readPageFromStore(long pageId,
ByteBuffer buff) throws IgniteChe
}
}
+ /** Remote snapshot future which tracks remote snapshot transmission
result. */
+ private static class RemoteSnapshotFutureTask extends
GridFutureAdapter<Void> implements Runnable {
+ /** Snapshot name to create. */
+ private final String rqId = RMT_SNAPSHOT_PREFIX +
U.maskForFileName(UUID.randomUUID().toString());
+
+ /** Remote node id to request snapshot from. */
+ private final UUID rmtNodeId;
+
+ /** Initialization procedure. */
+ private final IgniteThrowableConsumer<String> init;
+
+ /** Process interrupt checker. */
+ private final BooleanSupplier stopChecker;
+
+ /** Partition handler given by request initiator. */
+ private final BiConsumer<File, Throwable> partHnd;
+
+ /** Temporary working directory for consuming partitions. */
+ private final Path dir;
+
+ /** Counter which show how many partitions left to be received. */
+ private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+ /**
+ * @param rmtNodeId Remote node id to request snapshots.
+ * @param tmpWorkDir Temporary work directory.
+ * @param init The initialization task procedure.
+ * @param stopChecker Process interrupt checker.
+ * @param partHnd Partition handler.
+ */
+ public RemoteSnapshotFutureTask(
+ UUID rmtNodeId,
+ File tmpWorkDir,
+ IgniteThrowableConsumer<String> init,
Review comment:
It's better to include `init` code in `run` and pass plain parameters
here
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -333,6 +391,10 @@ public IgniteSnapshotManager(GridKernalContext ctx) {
marsh = MarshallerUtils.jdkMarshaller(ctx.igniteInstanceName());
restoreCacheGrpProc = new SnapshotRestoreProcess(ctx);
+
ctx.internalSubscriptionProcessor().registerMetastorageListener(restoreCacheGrpProc);
Review comment:
NL
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -316,6 +371,9 @@
/** Snapshot operation handlers. */
private final SnapshotHandlers handlers = new SnapshotHandlers();
+ /** Manager to handle remote snapshot requests and receive. */
+ private volatile SequentialRemoteSnapshotManager snpRmtHandler;
Review comment:
final
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2099,6 +2272,590 @@ private boolean readPageFromStore(long pageId,
ByteBuffer buff) throws IgniteChe
}
}
+ /** Remote snapshot future which tracks remote snapshot transmission
result. */
+ private static class RemoteSnapshotFutureTask extends
GridFutureAdapter<Void> implements Runnable {
+ /** Snapshot name to create. */
+ private final String rqId = RMT_SNAPSHOT_PREFIX +
U.maskForFileName(UUID.randomUUID().toString());
+
+ /** Remote node id to request snapshot from. */
+ private final UUID rmtNodeId;
+
+ /** Initialization procedure. */
+ private final IgniteThrowableConsumer<String> init;
+
+ /** Process interrupt checker. */
+ private final BooleanSupplier stopChecker;
+
+ /** Partition handler given by request initiator. */
+ private final BiConsumer<File, Throwable> partHnd;
+
+ /** Temporary working directory for consuming partitions. */
+ private final Path dir;
+
+ /** Counter which show how many partitions left to be received. */
+ private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+ /**
+ * @param rmtNodeId Remote node id to request snapshots.
+ * @param tmpWorkDir Temporary work directory.
+ * @param init The initialization task procedure.
+ * @param stopChecker Process interrupt checker.
+ * @param partHnd Partition handler.
+ */
+ public RemoteSnapshotFutureTask(
+ UUID rmtNodeId,
+ File tmpWorkDir,
+ IgniteThrowableConsumer<String> init,
+ BooleanSupplier stopChecker,
+ BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+ ) {
+ dir = Paths.get(tmpWorkDir.getAbsolutePath(), rqId);
+ this.rmtNodeId = rmtNodeId;
+ this.init = init;
+ this.stopChecker = stopChecker;
+ this.partHnd = partHnd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void run() {
+ if (isDone())
+ return;
+
+ try {
+ init.accept(rqId);
+ }
+ catch (Throwable t) {
+ onDone(t);
+ }
+ }
+
+ /**
+ * @param ex Exception occurred during receiving files.
+ */
+ public synchronized void acceptException(Throwable ex) {
+ if (isDone())
+ return;
+
+ try {
+ partHnd.accept(null, ex);
+ }
+ catch (Throwable t) {
+ ex.addSuppressed(t);
+ }
+
+ onDone(ex);
+ }
+
+ /**
+ * @param part Received file which needs to be handled.
+ */
+ public synchronized void acceptFile(File part) {
+ if (isDone())
+ return;
+
+ if (stopChecker.getAsBoolean())
+ throw new TransmissionCancelledException("Future cancelled
prior to the all requested partitions processed.");
+
+ partHnd.accept(part, null);
+ partsLeft.decrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected synchronized boolean onDone(@Nullable Void res,
@Nullable Throwable err, boolean cancel) {
+ U.delete(dir);
+
+ return super.onDone(res, err, cancel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ RemoteSnapshotFutureTask future = (RemoteSnapshotFutureTask)o;
+
+ return Objects.equals(rqId, future.rqId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(rqId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RemoteSnapshotFutureTask.class, this);
+ }
+ }
+
+ /**
+ * This manager is responsible for requesting and handling snapshots from
a remote node. Each snapshot request
+ * processed asynchronously but strictly one by one.
+ */
+ private class SequentialRemoteSnapshotManager implements
TransmissionHandler, GridMessageListener {
+ /** A task currently being executed and must be explicitly finished. */
+ private final AtomicReference<RemoteSnapshotFutureTask> active = new
AtomicReference<>();
+
+ /** Queue of asynchronous tasks to execute. */
+ private final Queue<RemoteSnapshotFutureTask> queue = new
ConcurrentLinkedDeque<>();
+
+ /** {@code true} if the node is stopping. */
+ private volatile boolean stopping;
+
+ /**
+ * @param task The supplier of a new task. May produce <tt>null</tt>
value if the queue is empty.
+ */
+ public void submit(Supplier<RemoteSnapshotFutureTask> task) {
+ RemoteSnapshotFutureTask curr = active.get();
+ RemoteSnapshotFutureTask next = task.get();
+
+ if ((curr == null || curr.isDone()) && active.compareAndSet(curr,
next)) {
+ if (next == null)
+ return;
+
+ next.listen(f -> submit(queue::poll));
+
+ if (stopping)
+ next.acceptException(new
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+ else
+ next.run();
+ }
+ else if (next != null)
+ queue.offer(next);
Review comment:
There is a race possible here, when {{next}} task will never be executed
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -276,21 +480,12 @@ protected void cleanup() throws IgniteCheckedException {
return;
}
-
- Collection<String> bltNodes =
F.viewReadOnly(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
- node -> node.consistentId().toString(), (node) ->
CU.baselineNode(node, ctx.state().clusterState()));
-
- snpBltNodes.removeAll(bltNodes);
-
- if (!snpBltNodes.isEmpty()) {
- finishProcess(fut0.rqId, new
IgniteIllegalStateException(OP_REJECT_MSG + "Some nodes required to " +
- "restore a cache group are missing [nodeId(s)=" +
snpBltNodes + ", snapshot=" + snpName + ']'));
-
- return;
- }
+ Collection<UUID> bltNodes =
F.viewReadOnly(ctx.discovery().serverNodes(AffinityTopologyVersion.NONE),
Review comment:
`Collection<UUID> bltNodes =
F.viewReadOnly(ctx.discovery().discoCache().aliveBaselineNodes(), F.node2id());`
?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -781,162 +918,181 @@ private void finishPrepare(UUID reqId, Map<UUID,
ArrayList<StoredCacheData>> res
// Context has been created - should rollback changes cluster-wide.
if (failure != null) {
- opCtx0.err.compareAndSet(null, failure);
-
- if (U.isLocalNodeCoordinator(ctx.discovery()))
- rollbackRestoreProc.start(reqId, reqId);
+ opCtx0.errHnd.accept(failure);
return;
}
Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
- for (List<StoredCacheData> storedCfgs : res.values()) {
- if (storedCfgs == null)
- continue;
+ for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e :
res.entrySet()) {
+ if (e.getValue().ccfgs != null) {
+ for (StoredCacheData cacheData : e.getValue().ccfgs)
+ globalCfgs.put(CU.cacheId(cacheData.config().getName()),
cacheData);
+ }
- for (StoredCacheData cacheData : storedCfgs)
- globalCfgs.put(CU.cacheId(cacheData.config().getName()),
cacheData);
+ opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new
ArrayList<>())
+ .addAll(e.getValue().metas);
}
opCtx0.cfgs = globalCfgs;
+ opCtx0.sameTop = sameTopology(opCtx0.nodes, opCtx0.metasPerNode);
if (U.isLocalNodeCoordinator(ctx.discovery()))
- cacheStartProc.start(reqId, reqId);
+ preloadProc.start(reqId, reqId);
}
/**
- * @param reqId Request ID.
- * @return Result future.
+ * @param nodes Nodes that have to alive to complete restore operation.
+ * @return {@code true} if the snapshot and current cluster topologies are
compatible.
*/
- private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
- if (ctx.clientNode())
- return new GridFinishedFuture<>();
+ private boolean sameTopology(Set<UUID> nodes, Map<UUID,
ArrayList<SnapshotMetadata>> metas) {
+ Set<String> clusterBlts = nodes.stream()
+ .map(n -> ctx.discovery().node(n).consistentId().toString())
+ .collect(Collectors.toSet());
- SnapshotRestoreContext opCtx0 = opCtx;
+ // Snapshot baseline nodes.
+ List<SnapshotMetadata> nodeMetas = F.first(metas.values());
- Throwable err = opCtx0.err.get();
+ if (nodeMetas == null)
+ return false;
- if (err != null)
- return new GridFinishedFuture<>(err);
+ Set<String> snpBlts = F.first(nodeMetas).baselineNodes();
- if (!U.isLocalNodeCoordinator(ctx.discovery()))
- return new GridFinishedFuture<>();
+ if (!clusterBlts.containsAll(snpBlts))
+ return false;
- Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+ // Each node must have its own local copy of a snapshot.
+ for (Map.Entry<UUID, ArrayList<SnapshotMetadata>> e :
metas.entrySet()) {
+ String consId =
ctx.discovery().node(e.getKey()).consistentId().toString();
- if (log.isInfoEnabled()) {
- log.info("Starting restored caches " +
- "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
- ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName())
+ ']');
+ // Local node metadata is always on the first place of a list.
+ SnapshotMetadata meta = F.first(e.getValue());
+
+ if (meta == null || !meta.consistentId().equals(consId))
+ return false;
}
- // We set the topology node IDs required to successfully start the
cache, if any of the required nodes leave
- // the cluster during the cache startup, the whole procedure will be
rolled back.
- return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true,
false, IgniteUuid.fromUuid(reqId));
+ return true;
}
/**
- * @param reqId Request ID.
- * @param res Results.
- * @param errs Errors.
+ * @param reqId Request id.
+ * @return Future which will be completed when the preload ends.
*/
- private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res,
Map<UUID, Exception> errs) {
+ private IgniteInternalFuture<Boolean> preload(UUID reqId) {
if (ctx.clientNode())
- return;
+ return new GridFinishedFuture<>();
SnapshotRestoreContext opCtx0 = opCtx;
+ GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
- Exception failure = errs.values().stream().findFirst().
- orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+ if (opCtx0 == null)
+ return new GridFinishedFuture<>(new
IgniteCheckedException("Snapshot restore process has incorrect restore state: "
+ reqId));
- if (failure == null) {
- finishProcess(reqId);
+ try {
+ if (ctx.isStopping())
+ throw new NodeStoppingException("Node is stopping: " +
ctx.localNodeId());
- return;
- }
+ IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
- opCtx0.err.compareAndSet(null, failure);
+ synchronized (this) {
+ opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f ->
null));
+ }
- if (U.isLocalNodeCoordinator(ctx.discovery()))
- rollbackRestoreProc.start(reqId, reqId);
- }
+ // Guard all snapshot restore operations with the metastorage keys.
+ updateMetastorageRecoveryKeys(opCtx0.dirs, false);
- /**
- * @param reqNodes Set of required topology nodes.
- * @param respNodes Set of responding topology nodes.
- * @return Error, if no response was received from the required topology
node.
- */
- private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
- if (!respNodes.containsAll(reqNodes)) {
- Set<UUID> leftNodes = new HashSet<>(reqNodes);
+ CompletableFuture<Void> metaFut =
ctx.localNodeId().equals(opCtx0.opNodeId) ?
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ SnapshotMetadata meta =
F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
- leftNodes.removeAll(respNodes);
+ File binDir =
binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+ meta.folderName());
- return new ClusterTopologyCheckedException(OP_REJECT_MSG +
- "Required node has left the cluster [nodeId=" + leftNodes +
']');
- }
+ ctx.cacheObjects().updateMetadata(binDir,
opCtx0.stopChecker);
+ }
+ catch (Throwable t) {
+ log.error("Unable to perform metadata update
operation for the cache groups restore process", t);
- return null;
- }
+ opCtx0.errHnd.accept(t);
+ }
+ }, snpMgr.snapshotExecutorService()) :
CompletableFuture.completedFuture(null);
- /**
- * @param reqId Request ID.
- * @return Result future.
- */
- private IgniteInternalFuture<Boolean> rollback(UUID reqId) {
- if (ctx.clientNode())
- return new GridFinishedFuture<>();
+ CompletableFuture<Void> partFut =
CompletableFuture.completedFuture(null);
- SnapshotRestoreContext opCtx0 = opCtx;
+ if (opCtx0.sameTop) {
+ if (log.isInfoEnabled()) {
+ log.info("The snapshot was taken on the same cluster
topology. It may by copied prior to starting cache groups " +
+ "[snpName=" + opCtx0.snpName +
+ ", dirs=" +
opCtx0.dirs.stream().map(File::getName).collect(Collectors.toList()) + ']');
+ }
- if (opCtx0 == null || F.isEmpty(opCtx0.dirs))
- return new GridFinishedFuture<>();
+ List<CompletableFuture<Path>> futs = new ArrayList<>();
+ String pdsFolderName =
ctx.pdsFolderResolver().resolveFolders().folderName();
- GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+ for (File cacheDir : opCtx0.dirs) {
+ File snpCacheDir = new
File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx0.snpName),
+ Paths.get(databaseRelativePath(pdsFolderName),
cacheDir.getName()).toString());
- synchronized (this) {
- opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+ if (!snpCacheDir.exists())
+ throw new IgniteCheckedException("Snapshot directory
doesn't exist: " + snpCacheDir);
- try {
-
ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
- if (log.isInfoEnabled()) {
- log.info("Removing restored cache directories [reqId="
+ reqId +
- ", snapshot=" + opCtx0.snpName + ", dirs=" +
opCtx0.dirs + ']');
- }
+ File tmpCacheDir = formatTmpDirName(cacheDir);
- IgniteCheckedException ex = null;
+ tmpCacheDir.mkdir();
- for (File cacheDir : opCtx0.dirs) {
- File tmpCacheDir = formatTmpDirName(cacheDir);
+ for (File snpFile : snpCacheDir.listFiles()) {
+ CompletableFuture<Path> fut;
- if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
- log.error("Unable to perform rollback routine
completely, cannot remove temp directory " +
- "[reqId=" + reqId + ", snapshot=" +
opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
+ copyFileLocal(snpMgr, opCtx0, snpFile,
+ Paths.get(tmpCacheDir.getAbsolutePath(),
snpFile.getName()),
+ fut = new CompletableFuture<>());
- ex = new IgniteCheckedException("Unable to remove
temporary cache directory " + cacheDir);
- }
+ futs.add(fut);
+ }
+ }
- if (cacheDir.exists() && !U.delete(cacheDir)) {
- log.error("Unable to perform rollback routine
completely, cannot remove cache directory " +
- "[reqId=" + reqId + ", snapshot=" +
opCtx0.snpName + ", dir=" + cacheDir + ']');
+ int size = futs.size();
- ex = new IgniteCheckedException("Unable to remove
cache directory " + cacheDir);
+ partFut = CompletableFuture.allOf(futs.toArray(new
CompletableFuture[size]))
+ .runAfterBothAsync(metaFut, () -> {
+ try {
+ if (opCtx0.stopChecker.getAsBoolean())
+ throw new IgniteInterruptedException("The
operation has been stopped on temporary directory switch.");
+
+ for (File src : opCtx0.dirs)
+ Files.move(formatTmpDirName(src).toPath(),
src.toPath(), StandardCopyOption.ATOMIC_MOVE);
}
- }
+ catch (Throwable e) {
+ opCtx0.errHnd.accept(e);
+ }
+ }, snpMgr.snapshotExecutorService())
+ // Complete the local rebalance cache future, since the
data is loaded.
+ .thenAccept(r -> opCtx0.cachesLoadFut.onDone(true));
+ }
- if (ex != null)
- retFut.onDone(ex);
- else
+ allOfFailFast(Arrays.asList(metaFut, partFut))
Review comment:
partFut already contains metaFut (since it created as runAfterBothAsync)
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -967,9 +1723,231 @@ private void finishRollback(UUID reqId, Map<UUID,
Boolean> res, Map<UUID, Except
" operation [reqId=" + reqId + ", snapshot=" + opCtx0.snpName
+ ", node(s)=" + leftNodes + ']');
}
+ updateMetastorageRecoveryKeys(opCtx0.dirs, true);
+
finishProcess(reqId, opCtx0.err.get());
}
+ /**
+ * @param mgr Ignite snapshot manager.
+ * @param opCtx Snapshot operation context.
+ * @param snpFile Snapshot file to copy.
+ * @param target Destination path.
+ * @param fut Future which will handle the copy results.
+ */
+ private static void copyFileLocal(
+ IgniteSnapshotManager mgr,
+ SnapshotRestoreContext opCtx,
+ File snpFile,
+ Path target,
+ CompletableFuture<Path> fut
Review comment:
Let's just return future here and don't do any additional nested-future
logic (do it whenever it's needed)
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -781,162 +918,181 @@ private void finishPrepare(UUID reqId, Map<UUID,
ArrayList<StoredCacheData>> res
// Context has been created - should rollback changes cluster-wide.
if (failure != null) {
- opCtx0.err.compareAndSet(null, failure);
-
- if (U.isLocalNodeCoordinator(ctx.discovery()))
- rollbackRestoreProc.start(reqId, reqId);
+ opCtx0.errHnd.accept(failure);
return;
}
Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
- for (List<StoredCacheData> storedCfgs : res.values()) {
- if (storedCfgs == null)
- continue;
+ for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e :
res.entrySet()) {
+ if (e.getValue().ccfgs != null) {
+ for (StoredCacheData cacheData : e.getValue().ccfgs)
+ globalCfgs.put(CU.cacheId(cacheData.config().getName()),
cacheData);
+ }
- for (StoredCacheData cacheData : storedCfgs)
- globalCfgs.put(CU.cacheId(cacheData.config().getName()),
cacheData);
+ opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new
ArrayList<>())
+ .addAll(e.getValue().metas);
}
opCtx0.cfgs = globalCfgs;
+ opCtx0.sameTop = sameTopology(opCtx0.nodes, opCtx0.metasPerNode);
if (U.isLocalNodeCoordinator(ctx.discovery()))
- cacheStartProc.start(reqId, reqId);
+ preloadProc.start(reqId, reqId);
}
/**
- * @param reqId Request ID.
- * @return Result future.
+ * @param nodes Nodes that have to alive to complete restore operation.
+ * @return {@code true} if the snapshot and current cluster topologies are
compatible.
*/
- private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
- if (ctx.clientNode())
- return new GridFinishedFuture<>();
+ private boolean sameTopology(Set<UUID> nodes, Map<UUID,
ArrayList<SnapshotMetadata>> metas) {
+ Set<String> clusterBlts = nodes.stream()
+ .map(n -> ctx.discovery().node(n).consistentId().toString())
+ .collect(Collectors.toSet());
- SnapshotRestoreContext opCtx0 = opCtx;
+ // Snapshot baseline nodes.
+ List<SnapshotMetadata> nodeMetas = F.first(metas.values());
- Throwable err = opCtx0.err.get();
+ if (nodeMetas == null)
+ return false;
- if (err != null)
- return new GridFinishedFuture<>(err);
+ Set<String> snpBlts = F.first(nodeMetas).baselineNodes();
- if (!U.isLocalNodeCoordinator(ctx.discovery()))
- return new GridFinishedFuture<>();
+ if (!clusterBlts.containsAll(snpBlts))
+ return false;
- Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+ // Each node must have its own local copy of a snapshot.
+ for (Map.Entry<UUID, ArrayList<SnapshotMetadata>> e :
metas.entrySet()) {
+ String consId =
ctx.discovery().node(e.getKey()).consistentId().toString();
- if (log.isInfoEnabled()) {
- log.info("Starting restored caches " +
- "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
- ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName())
+ ']');
+ // Local node metadata is always on the first place of a list.
+ SnapshotMetadata meta = F.first(e.getValue());
+
+ if (meta == null || !meta.consistentId().equals(consId))
+ return false;
}
- // We set the topology node IDs required to successfully start the
cache, if any of the required nodes leave
- // the cluster during the cache startup, the whole procedure will be
rolled back.
- return ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true, true,
false, IgniteUuid.fromUuid(reqId));
+ return true;
}
/**
- * @param reqId Request ID.
- * @param res Results.
- * @param errs Errors.
+ * @param reqId Request id.
+ * @return Future which will be completed when the preload ends.
*/
- private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res,
Map<UUID, Exception> errs) {
+ private IgniteInternalFuture<Boolean> preload(UUID reqId) {
if (ctx.clientNode())
- return;
+ return new GridFinishedFuture<>();
SnapshotRestoreContext opCtx0 = opCtx;
+ GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
- Exception failure = errs.values().stream().findFirst().
- orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+ if (opCtx0 == null)
+ return new GridFinishedFuture<>(new
IgniteCheckedException("Snapshot restore process has incorrect restore state: "
+ reqId));
- if (failure == null) {
- finishProcess(reqId);
+ try {
+ if (ctx.isStopping())
+ throw new NodeStoppingException("Node is stopping: " +
ctx.localNodeId());
- return;
- }
+ IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
- opCtx0.err.compareAndSet(null, failure);
+ synchronized (this) {
+ opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f ->
null));
+ }
- if (U.isLocalNodeCoordinator(ctx.discovery()))
- rollbackRestoreProc.start(reqId, reqId);
- }
+ // Guard all snapshot restore operations with the metastorage keys.
+ updateMetastorageRecoveryKeys(opCtx0.dirs, false);
- /**
- * @param reqNodes Set of required topology nodes.
- * @param respNodes Set of responding topology nodes.
- * @return Error, if no response was received from the required topology
node.
- */
- private Exception checkNodeLeft(Set<UUID> reqNodes, Set<UUID> respNodes) {
- if (!respNodes.containsAll(reqNodes)) {
- Set<UUID> leftNodes = new HashSet<>(reqNodes);
+ CompletableFuture<Void> metaFut =
ctx.localNodeId().equals(opCtx0.opNodeId) ?
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ SnapshotMetadata meta =
F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
- leftNodes.removeAll(respNodes);
+ File binDir =
binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+ meta.folderName());
- return new ClusterTopologyCheckedException(OP_REJECT_MSG +
- "Required node has left the cluster [nodeId=" + leftNodes +
']');
- }
+ ctx.cacheObjects().updateMetadata(binDir,
opCtx0.stopChecker);
+ }
+ catch (Throwable t) {
+ log.error("Unable to perform metadata update
operation for the cache groups restore process", t);
- return null;
- }
+ opCtx0.errHnd.accept(t);
+ }
+ }, snpMgr.snapshotExecutorService()) :
CompletableFuture.completedFuture(null);
- /**
- * @param reqId Request ID.
- * @return Result future.
- */
- private IgniteInternalFuture<Boolean> rollback(UUID reqId) {
- if (ctx.clientNode())
- return new GridFinishedFuture<>();
+ CompletableFuture<Void> partFut =
CompletableFuture.completedFuture(null);
- SnapshotRestoreContext opCtx0 = opCtx;
+ if (opCtx0.sameTop) {
+ if (log.isInfoEnabled()) {
+ log.info("The snapshot was taken on the same cluster
topology. It may by copied prior to starting cache groups " +
+ "[snpName=" + opCtx0.snpName +
+ ", dirs=" +
opCtx0.dirs.stream().map(File::getName).collect(Collectors.toList()) + ']');
+ }
- if (opCtx0 == null || F.isEmpty(opCtx0.dirs))
- return new GridFinishedFuture<>();
+ List<CompletableFuture<Path>> futs = new ArrayList<>();
+ String pdsFolderName =
ctx.pdsFolderResolver().resolveFolders().folderName();
- GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+ for (File cacheDir : opCtx0.dirs) {
+ File snpCacheDir = new
File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx0.snpName),
+ Paths.get(databaseRelativePath(pdsFolderName),
cacheDir.getName()).toString());
- synchronized (this) {
- opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+ if (!snpCacheDir.exists())
+ throw new IgniteCheckedException("Snapshot directory
doesn't exist: " + snpCacheDir);
- try {
-
ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
- if (log.isInfoEnabled()) {
- log.info("Removing restored cache directories [reqId="
+ reqId +
- ", snapshot=" + opCtx0.snpName + ", dirs=" +
opCtx0.dirs + ']');
- }
+ File tmpCacheDir = formatTmpDirName(cacheDir);
- IgniteCheckedException ex = null;
+ tmpCacheDir.mkdir();
- for (File cacheDir : opCtx0.dirs) {
- File tmpCacheDir = formatTmpDirName(cacheDir);
+ for (File snpFile : snpCacheDir.listFiles()) {
+ CompletableFuture<Path> fut;
- if (tmpCacheDir.exists() && !U.delete(tmpCacheDir)) {
- log.error("Unable to perform rollback routine
completely, cannot remove temp directory " +
- "[reqId=" + reqId + ", snapshot=" +
opCtx0.snpName + ", dir=" + tmpCacheDir + ']');
+ copyFileLocal(snpMgr, opCtx0, snpFile,
+ Paths.get(tmpCacheDir.getAbsolutePath(),
snpFile.getName()),
+ fut = new CompletableFuture<>());
- ex = new IgniteCheckedException("Unable to remove
temporary cache directory " + cacheDir);
- }
+ futs.add(fut);
+ }
+ }
- if (cacheDir.exists() && !U.delete(cacheDir)) {
- log.error("Unable to perform rollback routine
completely, cannot remove cache directory " +
- "[reqId=" + reqId + ", snapshot=" +
opCtx0.snpName + ", dir=" + cacheDir + ']');
+ int size = futs.size();
- ex = new IgniteCheckedException("Unable to remove
cache directory " + cacheDir);
+ partFut = CompletableFuture.allOf(futs.toArray(new
CompletableFuture[size]))
+ .runAfterBothAsync(metaFut, () -> {
+ try {
+ if (opCtx0.stopChecker.getAsBoolean())
+ throw new IgniteInterruptedException("The
operation has been stopped on temporary directory switch.");
+
+ for (File src : opCtx0.dirs)
+ Files.move(formatTmpDirName(src).toPath(),
src.toPath(), StandardCopyOption.ATOMIC_MOVE);
}
- }
+ catch (Throwable e) {
+ opCtx0.errHnd.accept(e);
+ }
+ }, snpMgr.snapshotExecutorService())
+ // Complete the local rebalance cache future, since the
data is loaded.
+ .thenAccept(r -> opCtx0.cachesLoadFut.onDone(true));
+ }
- if (ex != null)
- retFut.onDone(ex);
- else
+ allOfFailFast(Arrays.asList(metaFut, partFut))
+ .whenComplete((res, t) -> {
+ Throwable t0 = ofNullable(opCtx0.err.get()).orElse(t);
+
+ if (t0 == null) {
Review comment:
redundant {}
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -947,11 +1103,611 @@ private Exception checkNodeLeft(Set<UUID> reqNodes,
Set<UUID> respNodes) {
* @param res Results.
* @param errs Errors.
*/
- private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID,
Exception> errs) {
+ private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID,
Exception> errs) {
if (ctx.clientNode())
return;
- if (!errs.isEmpty()) {
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Exception failure = errs.values().stream().findFirst().
+ orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+ opCtx0.errHnd.accept(failure);
+
+ if (failure != null) {
+ opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ rollbackRestoreProc.start(reqId, reqId);
+
+ return;
+ }
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ cacheStartProc.start(reqId, reqId);
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @return Result future.
+ */
+ private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
+ if (ctx.clientNode())
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Throwable err = opCtx0.err.get();
+
+ if (err != null)
+ return new GridFinishedFuture<>(err);
+
+ if (!U.isLocalNodeCoordinator(ctx.discovery()))
+ return opCtx0.cachesLoadFut;
+
+ Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting restored caches " +
+ "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
+ ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName())
+ ']');
+ }
+
+ // We set the topology node IDs required to successfully start the
cache, if any of the required nodes leave
+ // the cluster during the cache startup, the whole procedure will be
rolled back.
+ GridCompoundFuture<Boolean, Boolean> awaitBoth = new
GridCompoundFuture<>();
+
+ IgniteInternalFuture<Boolean> cacheStartFut =
ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true,
+ true, !opCtx0.sameTop, IgniteUuid.fromUuid(reqId));
+
+ // This is required for the rollback procedure to execute the cache
groups stop operation.
+ cacheStartFut.listen(f -> opCtx0.isLocNodeStartedCaches = (f.error()
== null));
+
+ // Convert exception to the RestoreCacheStartException to propagate to
other nodes over the distributed process.
+ awaitBoth.add(chainCacheStartException(cacheStartFut));
+ awaitBoth.add(opCtx0.cachesLoadFut);
+
+ awaitBoth.markInitialized();
+
+ return awaitBoth;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture exchFut) {
+ if (ctx.clientNode())
+ return;
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+ Set<Integer> grpIdsToStart = getCachesLoadingFromSnapshot(exchFut,
opCtx0);
+
+ if (F.isEmpty(grpIdsToStart))
+ return;
+
+ assert opCtx0 != null;
+ assert !opCtx0.sameTop : "WAL must be disabled only for caches
restoring from snapshot taken on another cluster: " + opCtx0;
+
+ // This is happened on the exchange which has been initiated by a
dynamic cache start message and intend to switch
+ // off the WAL for cache groups loading from a snapshot.
+ for (CacheGroupContext grp : F.view(ctx.cache().cacheGroups(), g ->
grpIdsToStart.contains(g.groupId()))) {
+ assert grp.localWalEnabled() : grp.cacheOrGroupName();
+
+ // Check partitions have not even been created yet, so the
PartitionMetaStateRecord won't be logged to the WAL.
+ for (int p = 0; p < grp.topology().partitions(); p++)
+ assert grp.topology().localPartition(p) == null : p;
+
+ grp.localWalEnabled(false, true);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ // This will be called after the processCacheStopRequestOnExchangeDone
happens.
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (ctx.clientNode() || fut == null || fut.exchangeActions() == null
|| opCtx0 == null)
+ return;
+
+ Set<String> grpNamesToStop =
fut.exchangeActions().cacheGroupsToStop((ccfg, uuid) ->
+ requirePartitionLoad(ccfg, uuid, opCtx0))
+ .stream()
+ .map(g -> g.descriptor().cacheOrGroupName())
+ .collect(Collectors.toSet());
+
+ if (F.isEmpty(grpNamesToStop))
+ return;
+
+ assert grpNamesToStop.size() == opCtx0.dirs.size() : grpNamesToStop;
+
+ opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @param res Results.
+ * @param errs Errors.
+ */
+ private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res,
Map<UUID, Exception> errs) {
+ if (ctx.clientNode())
+ return;
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Exception failure = errs.values().stream().findFirst().
+ orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+ if (failure == null) {
+ if (opCtx0.sameTop) {
+ finishProcess(reqId);
+
+ updateMetastorageRecoveryKeys(opCtx0.dirs, true);
+ }
+ else
+ lateAffProc.start(reqId, reqId);
Review comment:
Shouldn't we start process only on coordinator? Why other processes in
`finish...` started only on coordinator?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -947,11 +1103,611 @@ private Exception checkNodeLeft(Set<UUID> reqNodes,
Set<UUID> respNodes) {
* @param res Results.
* @param errs Errors.
*/
- private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID,
Exception> errs) {
+ private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID,
Exception> errs) {
if (ctx.clientNode())
return;
- if (!errs.isEmpty()) {
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Exception failure = errs.values().stream().findFirst().
+ orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+ opCtx0.errHnd.accept(failure);
+
+ if (failure != null) {
+ opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ rollbackRestoreProc.start(reqId, reqId);
+
+ return;
+ }
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ cacheStartProc.start(reqId, reqId);
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @return Result future.
+ */
+ private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
+ if (ctx.clientNode())
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Throwable err = opCtx0.err.get();
+
+ if (err != null)
+ return new GridFinishedFuture<>(err);
+
+ if (!U.isLocalNodeCoordinator(ctx.discovery()))
+ return opCtx0.cachesLoadFut;
+
+ Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting restored caches " +
+ "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
+ ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName())
+ ']');
+ }
+
+ // We set the topology node IDs required to successfully start the
cache, if any of the required nodes leave
+ // the cluster during the cache startup, the whole procedure will be
rolled back.
+ GridCompoundFuture<Boolean, Boolean> awaitBoth = new
GridCompoundFuture<>();
+
+ IgniteInternalFuture<Boolean> cacheStartFut =
ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true,
+ true, !opCtx0.sameTop, IgniteUuid.fromUuid(reqId));
+
+ // This is required for the rollback procedure to execute the cache
groups stop operation.
+ cacheStartFut.listen(f -> opCtx0.isLocNodeStartedCaches = (f.error()
== null));
+
+ // Convert exception to the RestoreCacheStartException to propagate to
other nodes over the distributed process.
+ awaitBoth.add(chainCacheStartException(cacheStartFut));
+ awaitBoth.add(opCtx0.cachesLoadFut);
+
+ awaitBoth.markInitialized();
+
+ return awaitBoth;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture exchFut) {
+ if (ctx.clientNode())
+ return;
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+ Set<Integer> grpIdsToStart = getCachesLoadingFromSnapshot(exchFut,
opCtx0);
+
+ if (F.isEmpty(grpIdsToStart))
+ return;
+
+ assert opCtx0 != null;
+ assert !opCtx0.sameTop : "WAL must be disabled only for caches
restoring from snapshot taken on another cluster: " + opCtx0;
+
+ // This is happened on the exchange which has been initiated by a
dynamic cache start message and intend to switch
+ // off the WAL for cache groups loading from a snapshot.
+ for (CacheGroupContext grp : F.view(ctx.cache().cacheGroups(), g ->
grpIdsToStart.contains(g.groupId()))) {
+ assert grp.localWalEnabled() : grp.cacheOrGroupName();
+
+ // Check partitions have not even been created yet, so the
PartitionMetaStateRecord won't be logged to the WAL.
+ for (int p = 0; p < grp.topology().partitions(); p++)
+ assert grp.topology().localPartition(p) == null : p;
+
+ grp.localWalEnabled(false, true);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ // This will be called after the processCacheStopRequestOnExchangeDone
happens.
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (ctx.clientNode() || fut == null || fut.exchangeActions() == null
|| opCtx0 == null)
+ return;
+
+ Set<String> grpNamesToStop =
fut.exchangeActions().cacheGroupsToStop((ccfg, uuid) ->
+ requirePartitionLoad(ccfg, uuid, opCtx0))
+ .stream()
+ .map(g -> g.descriptor().cacheOrGroupName())
+ .collect(Collectors.toSet());
+
+ if (F.isEmpty(grpNamesToStop))
+ return;
+
+ assert grpNamesToStop.size() == opCtx0.dirs.size() : grpNamesToStop;
+
+ opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @param res Results.
+ * @param errs Errors.
+ */
+ private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res,
Map<UUID, Exception> errs) {
+ if (ctx.clientNode())
+ return;
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Exception failure = errs.values().stream().findFirst().
+ orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+ if (failure == null) {
+ if (opCtx0.sameTop) {
+ finishProcess(reqId);
+
+ updateMetastorageRecoveryKeys(opCtx0.dirs, true);
Review comment:
Shouldn't we update metastorage first and then complete future?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -967,9 +1723,231 @@ private void finishRollback(UUID reqId, Map<UUID,
Boolean> res, Map<UUID, Except
" operation [reqId=" + reqId + ", snapshot=" + opCtx0.snpName
+ ", node(s)=" + leftNodes + ']');
}
+ updateMetastorageRecoveryKeys(opCtx0.dirs, true);
+
finishProcess(reqId, opCtx0.err.get());
}
+ /**
+ * @param mgr Ignite snapshot manager.
+ * @param opCtx Snapshot operation context.
+ * @param snpFile Snapshot file to copy.
+ * @param target Destination path.
+ * @param fut Future which will handle the copy results.
+ */
+ private static void copyFileLocal(
+ IgniteSnapshotManager mgr,
+ SnapshotRestoreContext opCtx,
+ File snpFile,
+ Path target,
+ CompletableFuture<Path> fut
+ ) {
+ CompletableFuture.supplyAsync(
+ () -> {
+ if (opCtx.stopChecker.getAsBoolean())
+ throw new IgniteInterruptedException("The operation has
been stopped on copy file: " + snpFile.getAbsolutePath());
+
+ if (Thread.interrupted())
+ throw new IgniteInterruptedException("Thread has been
interrupted: " + Thread.currentThread().getName());
+
+ if (!snpFile.exists()) {
+ throw new IgniteException("Partition snapshot file doesn't
exist [snpName=" + opCtx.snpName +
+ ", snpDir=" + snpFile.getAbsolutePath() + ", name=" +
snpFile.getName() + ']');
+ }
+
+ IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile,
target.toFile(), snpFile.length());
+
+ return target;
+ }, mgr.snapshotExecutorService())
+ .whenComplete((r, t) -> opCtx.errHnd.accept(t))
+ .whenComplete((res, t) -> {
+ if (t == null)
+ fut.complete(res);
+ else
+ fut.completeExceptionally(t);
+ });
+ }
+
+ /**
+ * @param dirs List of keys to process.
+ * @param remove {@code} if the keys must be removed.
+ */
+ private void updateMetastorageRecoveryKeys(Collection<File> dirs, boolean
remove) {
+ for (File dir : dirs) {
+ ctx.cache().context().database().checkpointReadLock();
+
+ String mKey = RESTORE_KEY_PREFIX + dir.getName();
+
+ try {
+ if (remove)
+ metaStorage.remove(mKey);
+ else
+ metaStorage.write(mKey, true);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Updating the metastorage crash-recovery guard key
fails [remove=" + remove +
+ "dir=" + dir.getAbsolutePath() + ']');
+ }
+ finally {
+ ctx.cache().context().database().checkpointReadUnlock();
+ }
+ }
+ }
+
+ /**
+ * @param affCache Affinity cache.
+ * @param node Cluster node to get assigned partitions.
+ * @return The set of partitions assigned to the given node.
+ */
+ private static <T> Set<T> affinityPartitions(
+ GridAffinityAssignmentCache affCache,
+ ClusterNode node,
+ IntFunction<T> factory
+ ) {
+ return IntStream.range(0, affCache.partitions())
+ .filter(p ->
affCache.idealAssignment().assignment().get(p).contains(node))
+ .mapToObj(factory)
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * @param action Action to execute.
+ * @param ex Consumer which accepts exceptional execution result.
+ */
+ private static <T> void handleException(Callable<T> action,
Consumer<Throwable> ex) {
+ try {
+ action.call();
+ }
+ catch (Throwable t) {
+ ex.accept(t);
+ }
+ }
+
+ /**
+ * @param futs Map of futures to complete.
+ * @param filter Filtering collection.
+ * @param ex Exception.
+ */
+ private static void completeExceptionally(
+ Collection<Set<PartitionRestoreLifecycleFuture>> futs,
+ Map<Integer, Set<Integer>> filter,
+ Throwable ex
+ ) {
+ futs.stream()
+ .flatMap(Collection::stream)
+ .filter(e -> filter.containsKey(e.grp.groupId()))
+ .collect(Collectors.toList())
+ .forEach(f -> f.completeExceptionally(ex));
+ }
+
+ /**
+ * @param cacheStartFut The cache started future to wrap exception if need.
+ * @param <T> Result future type.
+ * @return Future which completes with wrapped exception if it occurred.
+ */
+ private static <T> IgniteInternalFuture<T>
chainCacheStartException(IgniteInternalFuture<T> cacheStartFut) {
+ GridFutureAdapter<T> out = new GridFutureAdapter<>();
+
+ cacheStartFut.listen(f -> {
+ if (f.error() == null)
+ out.onDone(f.result());
+ else
+ out.onDone(new RestoreCacheStartException(f.error()));
+ });
+
+ return out;
+ }
+
+ /**
+ * @param lcs Collection of partition context.
+ * @param partId Partition id to find.
+ * @return Load future.
+ */
+ private static @Nullable CompletableFuture<Path>
findLoadFuture(Set<PartitionRestoreLifecycleFuture> lcs, int partId) {
+ return ofNullable(F.find(lcs, null, (IgnitePredicate<? super
PartitionRestoreLifecycleFuture>)f -> f.partId == partId))
+ .map(c -> c.loaded)
+ .orElse(null);
+ }
+
+ /**
+ * @param fut Current exchange future.
+ * @param ctx Current snapshot restore context.
+ * @return The set of cache groups needs to be processed.
+ */
+ private Set<Integer>
getCachesLoadingFromSnapshot(GridDhtPartitionsExchangeFuture fut,
SnapshotRestoreContext ctx) {
+ if (fut == null || fut.exchangeActions() == null || ctx == null)
+ return Collections.emptySet();
+
+ return fut.exchangeActions().cacheGroupsToStart((ccfg, uuid) ->
requirePartitionLoad(ccfg, uuid, ctx))
+ .stream()
+ .map(g -> g.descriptor().groupId())
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * @param futs Collection of futures to chain.
+ * @param <T> Result type.
+ * @return Completable future waits for all of.
+ */
+ private static <T extends CompletableFuture<?>> CompletableFuture<Void>
allOfFailFast(Collection<T> futs) {
Review comment:
I'm not sure it's safe to use such an approach. After future competes
process can go further and clear snapshot context, but there are some futures
in executor that still processed in background, and this can lead to some
unpredictable behaviour. Perhaps it's better to to share and analyze some
common flag by the futures to fail fast (opCtx.err for example)
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
##########
@@ -3485,8 +3485,12 @@ else if (task instanceof ForceRebalanceExchangeTask) {
if (task instanceof ForceRebalanceExchangeTask)
forcedRebFut =
((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
- for (CacheGroupContext grp :
assignsSet.descendingSet()) {
- boolean disableRebalance =
cctx.snapshot().partitionsAreFrozen(grp);
+ if (forcedRebFut == null)
Review comment:
Wrong indent
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1435,13 +1524,62 @@ public static boolean
isSnapshotOperation(DiscoveryEvent evt) {
}
}
+ /**
+ * @param parts Collection of pairs group and appropriate cache partition
to be snapshot.
+ * @param rmtNodeId The remote node to connect to.
+ * @param partHnd Received partition handler.
+ */
+ public IgniteInternalFuture<Void> requestRemoteSnapshotAsync(
Review comment:
It's not actually async, RemoteSnapshotFutureTask is executed from the
same thread
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -967,9 +1723,231 @@ private void finishRollback(UUID reqId, Map<UUID,
Boolean> res, Map<UUID, Except
" operation [reqId=" + reqId + ", snapshot=" + opCtx0.snpName
+ ", node(s)=" + leftNodes + ']');
}
+ updateMetastorageRecoveryKeys(opCtx0.dirs, true);
+
finishProcess(reqId, opCtx0.err.get());
}
+ /**
+ * @param mgr Ignite snapshot manager.
+ * @param opCtx Snapshot operation context.
+ * @param snpFile Snapshot file to copy.
+ * @param target Destination path.
+ * @param fut Future which will handle the copy results.
+ */
+ private static void copyFileLocal(
+ IgniteSnapshotManager mgr,
+ SnapshotRestoreContext opCtx,
+ File snpFile,
+ Path target,
+ CompletableFuture<Path> fut
+ ) {
+ CompletableFuture.supplyAsync(
+ () -> {
+ if (opCtx.stopChecker.getAsBoolean())
+ throw new IgniteInterruptedException("The operation has
been stopped on copy file: " + snpFile.getAbsolutePath());
+
+ if (Thread.interrupted())
+ throw new IgniteInterruptedException("Thread has been
interrupted: " + Thread.currentThread().getName());
+
+ if (!snpFile.exists()) {
+ throw new IgniteException("Partition snapshot file doesn't
exist [snpName=" + opCtx.snpName +
+ ", snpDir=" + snpFile.getAbsolutePath() + ", name=" +
snpFile.getName() + ']');
+ }
+
+ IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile,
target.toFile(), snpFile.length());
+
+ return target;
+ }, mgr.snapshotExecutorService())
+ .whenComplete((r, t) -> opCtx.errHnd.accept(t))
+ .whenComplete((res, t) -> {
+ if (t == null)
+ fut.complete(res);
+ else
+ fut.completeExceptionally(t);
+ });
+ }
+
+ /**
+ * @param dirs List of keys to process.
+ * @param remove {@code} if the keys must be removed.
+ */
+ private void updateMetastorageRecoveryKeys(Collection<File> dirs, boolean
remove) {
+ for (File dir : dirs) {
+ ctx.cache().context().database().checkpointReadLock();
+
+ String mKey = RESTORE_KEY_PREFIX + dir.getName();
+
+ try {
+ if (remove)
+ metaStorage.remove(mKey);
+ else
+ metaStorage.write(mKey, true);
+ }
+ catch (IgniteCheckedException e) {
+ log.error("Updating the metastorage crash-recovery guard key
fails [remove=" + remove +
+ "dir=" + dir.getAbsolutePath() + ']');
+ }
+ finally {
+ ctx.cache().context().database().checkpointReadUnlock();
+ }
+ }
+ }
+
+ /**
+ * @param affCache Affinity cache.
+ * @param node Cluster node to get assigned partitions.
+ * @return The set of partitions assigned to the given node.
+ */
+ private static <T> Set<T> affinityPartitions(
+ GridAffinityAssignmentCache affCache,
+ ClusterNode node,
+ IntFunction<T> factory
Review comment:
This parameter looks strange, used only once
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -947,11 +1103,611 @@ private Exception checkNodeLeft(Set<UUID> reqNodes,
Set<UUID> respNodes) {
* @param res Results.
* @param errs Errors.
*/
- private void finishRollback(UUID reqId, Map<UUID, Boolean> res, Map<UUID,
Exception> errs) {
+ private void finishPreload(UUID reqId, Map<UUID, Boolean> res, Map<UUID,
Exception> errs) {
if (ctx.clientNode())
return;
- if (!errs.isEmpty()) {
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Exception failure = errs.values().stream().findFirst().
+ orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+ opCtx0.errHnd.accept(failure);
+
+ if (failure != null) {
+ opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ rollbackRestoreProc.start(reqId, reqId);
+
+ return;
+ }
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ cacheStartProc.start(reqId, reqId);
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @return Result future.
+ */
+ private IgniteInternalFuture<Boolean> cacheStart(UUID reqId) {
+ if (ctx.clientNode())
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Throwable err = opCtx0.err.get();
+
+ if (err != null)
+ return new GridFinishedFuture<>(err);
+
+ if (!U.isLocalNodeCoordinator(ctx.discovery()))
+ return opCtx0.cachesLoadFut;
+
+ Collection<StoredCacheData> ccfgs = opCtx0.cfgs.values();
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting restored caches " +
+ "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName +
+ ", caches=" + F.viewReadOnly(ccfgs, c -> c.config().getName())
+ ']');
+ }
+
+ // We set the topology node IDs required to successfully start the
cache, if any of the required nodes leave
+ // the cluster during the cache startup, the whole procedure will be
rolled back.
+ GridCompoundFuture<Boolean, Boolean> awaitBoth = new
GridCompoundFuture<>();
+
+ IgniteInternalFuture<Boolean> cacheStartFut =
ctx.cache().dynamicStartCachesByStoredConf(ccfgs, true,
+ true, !opCtx0.sameTop, IgniteUuid.fromUuid(reqId));
+
+ // This is required for the rollback procedure to execute the cache
groups stop operation.
+ cacheStartFut.listen(f -> opCtx0.isLocNodeStartedCaches = (f.error()
== null));
+
+ // Convert exception to the RestoreCacheStartException to propagate to
other nodes over the distributed process.
+ awaitBoth.add(chainCacheStartException(cacheStartFut));
+ awaitBoth.add(opCtx0.cachesLoadFut);
+
+ awaitBoth.markInitialized();
+
+ return awaitBoth;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture exchFut) {
+ if (ctx.clientNode())
+ return;
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+ Set<Integer> grpIdsToStart = getCachesLoadingFromSnapshot(exchFut,
opCtx0);
+
+ if (F.isEmpty(grpIdsToStart))
+ return;
+
+ assert opCtx0 != null;
+ assert !opCtx0.sameTop : "WAL must be disabled only for caches
restoring from snapshot taken on another cluster: " + opCtx0;
+
+ // This is happened on the exchange which has been initiated by a
dynamic cache start message and intend to switch
+ // off the WAL for cache groups loading from a snapshot.
+ for (CacheGroupContext grp : F.view(ctx.cache().cacheGroups(), g ->
grpIdsToStart.contains(g.groupId()))) {
+ assert grp.localWalEnabled() : grp.cacheOrGroupName();
+
+ // Check partitions have not even been created yet, so the
PartitionMetaStateRecord won't be logged to the WAL.
+ for (int p = 0; p < grp.topology().partitions(); p++)
+ assert grp.topology().localPartition(p) == null : p;
+
+ grp.localWalEnabled(false, true);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ // This will be called after the processCacheStopRequestOnExchangeDone
happens.
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (ctx.clientNode() || fut == null || fut.exchangeActions() == null
|| opCtx0 == null)
+ return;
+
+ Set<String> grpNamesToStop =
fut.exchangeActions().cacheGroupsToStop((ccfg, uuid) ->
+ requirePartitionLoad(ccfg, uuid, opCtx0))
+ .stream()
+ .map(g -> g.descriptor().cacheOrGroupName())
+ .collect(Collectors.toSet());
+
+ if (F.isEmpty(grpNamesToStop))
+ return;
+
+ assert grpNamesToStop.size() == opCtx0.dirs.size() : grpNamesToStop;
+
+ opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @param res Results.
+ * @param errs Errors.
+ */
+ private void finishCacheStart(UUID reqId, Map<UUID, Boolean> res,
Map<UUID, Exception> errs) {
+ if (ctx.clientNode())
+ return;
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Exception failure = errs.values().stream().findFirst().
+ orElse(checkNodeLeft(opCtx0.nodes, res.keySet()));
+
+ if (failure == null) {
+ if (opCtx0.sameTop) {
+ finishProcess(reqId);
+
+ updateMetastorageRecoveryKeys(opCtx0.dirs, true);
+ }
+ else
+ lateAffProc.start(reqId, reqId);
+
+ return;
+ }
+
+ opCtx0.errHnd.accept(failure);
+
+ ClusterNode crd = U.oldest(ctx.discovery().aliveServerNodes(), null);
+
+ // Caches were not even been started and rollback already occurred
during PME, so they are not even stared.
+ if (X.hasCause(errs.get(crd.id()), RestoreCacheStartException.class))
+ opCtx0.locStopCachesCompleteFut.onDone((Void)null);
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ rollbackRestoreProc.start(reqId, reqId);
+ }
+
+ /**
+ * @param grps Ordered list of cache groups sorted by priority.
+ * @param exchFut Exchange future.
+ */
+ public void onRebalanceReady(Set<CacheGroupContext> grps, @Nullable
GridDhtPartitionsExchangeFuture exchFut) {
+ if (ctx.clientNode())
+ return;
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Set<Integer> exchGrpIds = getCachesLoadingFromSnapshot(exchFut,
opCtx0);
+
+ Set<CacheGroupContext> filtered = grps.stream()
+ .filter(Objects::nonNull)
+ .filter(g -> exchGrpIds.contains(g.groupId()))
+ .collect(Collectors.toSet());
+
+ // Restore requests has been already processed at previous exchange.
+ if (filtered.isEmpty())
+ return;
+
+ assert opCtx0 != null;
+
+ // First preload everything from the local node.
+ List<SnapshotMetadata> locMetas =
opCtx0.metasPerNode.get(ctx.localNodeId());
+
+ Map<Integer, Set<Integer>> notScheduled = new HashMap<>();
+
+ // Register partitions to be processed.
+ for (CacheGroupContext grp : filtered) {
+ if (F.isEmpty(locMetas))
+ break;
+
+ notScheduled.put(grp.groupId(),
+ affinityPartitions(grp.affinity(),
ctx.cache().context().localNode(), Integer::new));
+
+ Set<Integer> leftParts = notScheduled.get(grp.groupId());
Review comment:
`get` method right after `put` into map
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]