alex-plekhanov commented on a change in pull request #9539:
URL: https://github.com/apache/ignite/pull/9539#discussion_r744705988
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
##########
@@ -3485,7 +3485,7 @@ else if (task instanceof ForceRebalanceExchangeTask) {
if (task instanceof ForceRebalanceExchangeTask)
forcedRebFut =
((ForceRebalanceExchangeTask)task).forcedRebalanceFuture();
- for (CacheGroupContext grp :
assignsSet.descendingSet()) {
+ for (CacheGroupContext grp :
assignsSet.descendingSet()) {
Review comment:
Wrong indent
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -96,9 +115,12 @@
private final GridKernalContext ctx;
/** Cache group restore prepare phase. */
- private final DistributedProcess<SnapshotOperationRequest,
ArrayList<StoredCacheData>> prepareRestoreProc;
+ private final DistributedProcess<SnapshotOperationRequest,
SnapshotRestoreOperationResponse> prepareRestoreProc;
- /** Cache group restore cache start phase. */
+ /**Cache group restore cache start phase. */
Review comment:
Space after `/**`.
Looks like javadoc for `cacheStartProc`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotFutureTask.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * @param <T> Type of snapshot processing result.
+ */
+abstract class AbstractSnapshotFutureTask<T> extends GridFutureAdapter<T> {
+ /** Shared context. */
+ protected final GridCacheSharedContext<?, ?> cctx;
+
+ /** Ignite logger. */
+ protected final IgniteLogger log;
+
+ /** Node id which cause snapshot operation. */
+ protected final UUID srcNodeId;
+
+ /** Unique identifier of snapshot process. */
+ protected final String snpName;
+
+ /** Snapshot working directory on file system. */
+ protected final File tmpSnpWorkDir;
+
+ /** IO factory which will be used for creating snapshot delta-writers. */
+ protected final FileIOFactory ioFactory;
+
+ /** Snapshot data sender. */
+ @GridToStringExclude
+ protected final SnapshotSender snpSndr;
+
+ /** Partition to be processed. */
+ protected final Map<Integer, Set<Integer>> parts;
+
+ /** An exception which has been occurred during snapshot processing. */
+ protected final AtomicReference<Throwable> err = new AtomicReference<>();
+
+ /**
+ * @param cctx Shared context.
+ * @param srcNodeId Node id which cause snapshot task creation.
+ * @param snpName Unique identifier of snapshot process.
+ * @param tmpWorkDir Working directory for intermediate snapshot results.
+ * @param ioFactory Factory to working with snapshot files.
+ * @param snpSndr Factory which produces snapshot receiver instance.
+ * @param parts Partition to be processed.
+ */
+ protected AbstractSnapshotFutureTask(
+ GridCacheSharedContext<?, ?> cctx,
+ UUID srcNodeId,
+ String snpName,
+ File tmpWorkDir,
+ FileIOFactory ioFactory,
+ SnapshotSender snpSndr,
+ Map<Integer, Set<Integer>> parts
+ ) {
+ assert snpName != null : "Snapshot name cannot be empty or null.";
+ assert snpSndr != null : "Snapshot sender which handles execution
tasks must be not null.";
+ assert snpSndr.executor() != null : "Executor service must be not
null.";
+
+ this.cctx = cctx;
+ this.log = cctx.logger(AbstractSnapshotFutureTask.class);
+ this.srcNodeId = srcNodeId;
+ this.snpName = snpName;
+ this.tmpSnpWorkDir = new File(tmpWorkDir, snpName);
+ this.ioFactory = ioFactory;
+ this.snpSndr = snpSndr;
+ this.parts = parts;
+ }
+
+ /**
+ * @return Snapshot name.
+ */
+ public String snapshotName() {
+ return snpName;
+ }
+
+ /**
+ * @return Node id which triggers this operation.
+ */
+ public UUID sourceNodeId() {
+ return srcNodeId;
+ }
+
+ /**
+ * // TODO started and start methods can be merged.
Review comment:
TODO should be with the link to the ticket
Method name is strange (taking into account return value) and it strange to
have it in parent class.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2235,609 @@ private boolean readPageFromStore(long pageId,
ByteBuffer buff) throws IgniteChe
}
}
+ /** Remote snapshot future which tracks remote snapshot transmission
result. */
+ private static class RemoteSnapshotHandler extends GridFutureAdapter<Void>
implements Runnable {
+ /** Snapshot name to create. */
+ private final String reqId = RMT_SNAPSHOT_PREFIX +
U.maskForFileName(UUID.randomUUID().toString());
+
+ /** Ignite snapshot manager. */
+ private final IgniteSnapshotManager snpMgr;
+
+ /** Initial message to send request. */
+ private final SnapshotRequestMessage initMsg;
+
+ /** Remote node id to request snapshot from. */
+ private final UUID rmtNodeId;
+
+ /** Process interrupt checker. */
+ private final BooleanSupplier stopChecker;
+
+ /** Partition handler given by request initiator. */
+ private final BiConsumer<File, Throwable> partHnd;
+
+ /** Temporary working directory for consuming partitions. */
+ private final Path dir;
+
+ /** Counter which show how many partitions left to be received. */
+ private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+ /**
+ * @param snpMgr Ignite snapshot manager.
+ * @param rmtNodeId Remote node to request snapshot from.
+ * @param snpName Snapshot name to request.
+ * @param parts Cache group and partitions to request.
+ * @param stopChecker Process interrupt checker.
+ * @param partHnd Partition handler.
+ */
+ public RemoteSnapshotHandler(
+ IgniteSnapshotManager snpMgr,
+ UUID rmtNodeId,
+ String snpName,
+ Map<Integer, Set<Integer>> parts,
+ BooleanSupplier stopChecker,
+ BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+ ) {
+ dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+ initMsg = new SnapshotRequestMessage(reqId, snpName, parts);
+
+ this.snpMgr = snpMgr;
+ this.rmtNodeId = rmtNodeId;
+ this.stopChecker = stopChecker;
+ this.partHnd = partHnd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void run() {
+ if (isDone())
+ return;
+
+ try {
+ ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+ if (rmtNode == null) {
+ throw new ClusterTopologyCheckedException("Snapshot remote
request cannot be performed. " +
+ "Remote node left the grid [rmtNodeId=" + rmtNodeId +
']');
+ }
+
+ snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+ DFLT_INITIAL_SNAPSHOT_TOPIC,
+ initMsg,
+ SYSTEM_POOL,
+ Long.MAX_VALUE,
+ true);
+
+ if (snpMgr.log.isInfoEnabled()) {
+ snpMgr.log.info("Snapshot request is sent to the remote
node [rmtNodeId=" + rmtNodeId +
+ ", snpName=" + initMsg.snapshotName() + ", rqId=" +
reqId + ']');
+ }
+ }
+ catch (Throwable t) {
+ onDone(t);
+ }
+ }
+
+ /**
+ * @param ex Exception occurred during receiving files.
+ */
+ public synchronized void acceptException(Throwable ex) {
+ if (isDone())
+ return;
+
+ try {
+ partHnd.accept(null, ex);
+ }
+ catch (Throwable t) {
+ ex.addSuppressed(t);
+ }
+
+ onDone(ex);
+ }
+
+ /**
+ * @param part Received file which needs to be handled.
+ */
+ public synchronized void acceptFile(File part) {
+ if (isDone())
+ return;
+
+ if (stopChecker.getAsBoolean())
+ throw new TransmissionCancelledException("Future cancelled
prior to the all requested partitions processed.");
+
+ partHnd.accept(part, null);
+ partsLeft.decrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected synchronized boolean onDone(@Nullable Void res,
@Nullable Throwable err, boolean cancel) {
+ U.delete(dir);
+
+ return super.onDone(res, err, cancel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ RemoteSnapshotHandler future = (RemoteSnapshotHandler)o;
+
+ return Objects.equals(reqId, future.reqId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return reqId.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RemoteSnapshotHandler.class, this);
+ }
+ }
+
+ /**
+ * This manager is responsible for requesting and handling snapshots from
a remote node. Each snapshot request
+ * processed asynchronously but strictly one by one.
+ */
+ private class SequentialRemoteSnapshotManager implements
TransmissionHandler, GridMessageListener {
+ /** A task currently being executed and must be explicitly finished. */
+ private volatile RemoteSnapshotHandler active;
+
+ /** Queue of asynchronous tasks to execute. */
+ private final Queue<RemoteSnapshotHandler> queue = new
ConcurrentLinkedDeque<>();
+
+ /** {@code true} if the node is stopping. */
+ private volatile boolean stopping;
+
+ /**
+ * @param next New task for scheduling. May produce <tt>null</tt>
value if the queue is empty.
+ */
+ public synchronized void submit(@Nullable RemoteSnapshotHandler next) {
+ RemoteSnapshotHandler curr = active;
+
+ if (curr == null || curr.isDone()) {
+ if (next == null)
+ return;
+
+ next.listen(f -> submit(queue.poll()));
+
+ active = next;
+
+ if (stopping)
+ next.acceptException(new
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+ else
+ next.run();
Review comment:
Runnable is not a good interface here, since someone might think that
the future completes when the `run()` method completes. Actually `run()` method
just initializes the task.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -787,30 +711,351 @@ private void finishPrepare(UUID reqId, Map<UUID,
ArrayList<StoredCacheData>> res
}
if (failure == null)
- failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+ failure = checkNodeLeft(opCtx0.nodes(), res.keySet());
// Context has been created - should rollback changes cluster-wide.
if (failure != null) {
- opCtx0.err.compareAndSet(null, failure);
-
- if (U.isLocalNodeCoordinator(ctx.discovery()))
- rollbackRestoreProc.start(reqId, reqId);
+ opCtx0.errHnd.accept(failure);
return;
}
Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
- for (List<StoredCacheData> storedCfgs : res.values()) {
- if (storedCfgs == null)
- continue;
+ for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e :
res.entrySet()) {
+ if (e.getValue().ccfgs != null) {
+ for (StoredCacheData cacheData : e.getValue().ccfgs) {
+ globalCfgs.put(CU.cacheId(cacheData.config().getName()),
cacheData);
- for (StoredCacheData cacheData : storedCfgs)
- globalCfgs.put(CU.cacheId(cacheData.config().getName()),
cacheData);
+
opCtx0.dirs.add(((FilePageStoreManager)ctx.cache().context().pageStore()).cacheWorkDir(cacheData.config()));
+ }
+ }
+
+ opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new
ArrayList<>())
+ .addAll(e.getValue().metas);
}
opCtx0.cfgs = globalCfgs;
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ preloadProc.start(reqId, reqId);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @param cache Discovery cache.
+ * @return Map of affinity per each cache group.
+ */
+ private static GridAffinityAssignmentCache calculateAffinity(
+ GridKernalContext ctx,
+ CacheConfiguration<?, ?> ccfg,
+ DiscoCache cache
+ ) {
+ GridAffinityAssignmentCache affCache =
GridAffinityAssignmentCache.create(ctx, ccfg.getAffinity(), ccfg);
+
+ affCache.calculate(cache.version(), null, cache);
+
+ return affCache;
+ }
+
+ /**
+ * @param metas List of snapshot metadata to check.
+ * @param grpId Group id.
+ * @param parts Set of partitions to search for.
+ * @return Snapshot metadata which contains a full set of given partitions
or {@code null} the otherwise.
+ */
+ private static @Nullable SnapshotMetadata findMetadataWithSamePartitions(
+ List<SnapshotMetadata> metas,
+ int grpId,
+ Set<Integer> parts
+ ) {
+ assert !F.isEmpty(parts) && !parts.contains(INDEX_PARTITION) : parts;
+
+ // Try to copy everything right from the single snapshot part.
+ for (SnapshotMetadata meta : metas) {
+ Set<Integer> grpParts = meta.partitions().get(grpId);
+ Set<Integer> grpWoIndex = grpParts == null ?
Collections.emptySet() : new HashSet<>(grpParts);
+
+ grpWoIndex.remove(INDEX_PARTITION);
+
+ if (grpWoIndex.equals(parts))
+ return meta;
+ }
+
+ return null;
+ }
+
+ /**
+ * @param reqId Request id.
+ * @return Future which will be completed when the preload ends.
+ */
+ private IgniteInternalFuture<Boolean> preload(UUID reqId) {
+ if (ctx.clientNode())
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+ GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+ if (opCtx0 == null)
+ return new GridFinishedFuture<>(new
IgniteCheckedException("Snapshot restore process has incorrect restore state: "
+ reqId));
+
+ if (opCtx0.dirs.isEmpty())
+ return new GridFinishedFuture<>();
+
+ try {
+ if (ctx.isStopping())
+ throw new NodeStoppingException("Node is stopping: " +
ctx.localNodeId());
+
+ IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
+
+ synchronized (this) {
+ opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f ->
null));
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting snapshot preload operation to restore cache
groups" +
+ "[snapshot=" + opCtx0.snpName +
+ ", caches=" + F.transform(opCtx0.dirs, File::getName) +
']');
+ }
+
+ CompletableFuture<Void> metaFut =
ctx.localNodeId().equals(opCtx0.opNodeId) ?
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ SnapshotMetadata meta =
F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
+
+ File binDir =
binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+ meta.folderName());
+
+ ctx.cacheObjects().updateMetadata(binDir,
opCtx0.stopChecker);
+ }
+ catch (Throwable t) {
+ log.error("Unable to perform metadata update
operation for the cache groups restore process", t);
+
+ opCtx0.errHnd.accept(t);
+ }
+ }, snpMgr.snapshotExecutorService()) :
CompletableFuture.completedFuture(null);
+
+ for (StoredCacheData data : opCtx0.cfgs.values()) {
+
opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+ grp -> calculateAffinity(ctx, data.config(),
opCtx0.discoCache));
+ }
+
+ // First preload everything from the local node.
+ List<SnapshotMetadata> locMetas =
opCtx0.metasPerNode.get(ctx.localNodeId());
+
+ Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new
HashMap<>();
+ ClusterNode locNode = ctx.cache().context().localNode();
+
+ for (File dir : opCtx0.dirs) {
+ String cacheOrGrpName = cacheGroupName(dir);
+ int grpId = CU.cacheId(cacheOrGrpName);
+
+ File tmpCacheDir = formatTmpDirName(dir);
+ tmpCacheDir.mkdir();
+
+ Set<PartitionRestoreFuture> leftParts;
+
+ opCtx0.locProgress.put(grpId,
+
nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode,
PartitionRestoreFuture::new));
+
+ rmtLoadParts.put(grpId, leftParts = new
HashSet<>(opCtx0.locProgress.get(grpId)));
+
+ if (leftParts.isEmpty())
+ continue;
+
+ SnapshotMetadata full =
findMetadataWithSamePartitions(locMetas,
+ grpId,
+ leftParts.stream().map(p ->
p.partId).collect(Collectors.toSet()));
+
+ for (SnapshotMetadata meta : full == null ? locMetas :
Collections.singleton(full)) {
+ if (leftParts.isEmpty())
+ break;
+
+ File snpCacheDir = new
File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx.snpName),
+ Paths.get(databaseRelativePath(meta.folderName()),
dir.getName()).toString());
+
+ leftParts.removeIf(partFut -> {
+ boolean doCopy =
ofNullable(meta.partitions().get(grpId))
+ .orElse(Collections.emptySet())
+ .contains(partFut.partId);
+
+ if (doCopy) {
+ copyLocalAsync(ctx.cache().context().snapshotMgr(),
+ opCtx,
+ snpCacheDir,
+ tmpCacheDir,
+ partFut);
+ }
+
+ return doCopy;
+ });
+
+ if (meta == full) {
+ assert leftParts.isEmpty() : leftParts;
+
+ if (log.isInfoEnabled()) {
+ log.info("The snapshot was taken on the same
cluster topology. The index will be copied to " +
+ "restoring cache group if necessary [snpName="
+ opCtx0.snpName + ", dir=" + dir.getName() + ']');
+ }
+
+ File idxFile = new File(snpCacheDir,
FilePageStoreManager.getPartitionFileName(INDEX_PARTITION));
+
+ if (idxFile.exists()) {
+ PartitionRestoreFuture idxFut;
+
+ opCtx0.locProgress.computeIfAbsent(grpId, g -> new
HashSet<>())
+ .add(idxFut = new
PartitionRestoreFuture(INDEX_PARTITION));
+
+ copyLocalAsync(ctx.cache().context().snapshotMgr(),
+ opCtx,
+ snpCacheDir,
+ tmpCacheDir,
+ idxFut);
+ }
+ }
+ }
+ }
+
+ // Load other partitions from remote nodes.
+ // This is necessary for sending only one partitions request per
each cluster node.
+ Map<UUID, Map<Integer, Set<Integer>>> snpAff =
snapshotAffinity(opCtx0.metasPerNode,
+ (grpId, partId) -> rmtLoadParts.get(grpId) != null &&
+ rmtLoadParts.get(grpId).contains(new
PartitionRestoreFuture(partId)));
+ Map<Integer, File> grpToDir = opCtx0.dirs.stream()
+ .collect(Collectors.toMap(d ->
CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
+ d -> d));
+
+ try {
+ if (log.isInfoEnabled() && !snpAff.isEmpty()) {
+ log.info("Trying to request partitions from remote nodes" +
+ "[snapshot=" + opCtx0.snpName +
+ ", map=" +
snpAff.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+ e -> partitionsMapToCompactString(e.getValue()))) +
']');
+ }
+
+ for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m :
snpAff.entrySet()) {
+ if (m.getKey().equals(ctx.localNodeId()))
+ continue;
+
+ ctx.cache().context().snapshotMgr()
+ .requestRemoteSnapshot(m.getKey(),
+ opCtx0.snpName,
+ m.getValue(),
+ opCtx0.stopChecker,
+ (snpFile, t) -> {
+ if (opCtx0.stopChecker.getAsBoolean())
+ throw new
TransmissionCancelledException("Snapshot remote operation request cancelled.");
Review comment:
This type of exception here looks strange. Method `preload` knows
nothing about the internals of `requestRemoteSnapshot`, but looks like this
exception is required by some workflow in `requestRemoteSnapshot`.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -1007,17 +1414,96 @@ private void finishRollback(UUID reqId, Map<UUID,
Boolean> res, Map<UUID, Except
/**
* @param req Request to prepare cache group restore from the snapshot.
- * @param dirs List of cache group names to restore from the snapshot.
* @param cfgs Cache ID to configuration mapping.
*/
- protected SnapshotRestoreContext(SnapshotOperationRequest req,
Collection<File> dirs,
- Map<Integer, StoredCacheData> cfgs) {
+ protected SnapshotRestoreContext(
+ SnapshotOperationRequest req,
+ DiscoCache discoCache,
+ Map<Integer, StoredCacheData> cfgs,
+ UUID locNodeId,
+ List<SnapshotMetadata> locMetas
+ ) {
reqId = req.requestId();
snpName = req.snapshotName();
- nodes = new HashSet<>(req.nodes());
+ opNodeId = req.operationalNodeId();
+ this.discoCache = discoCache;
- this.dirs = dirs;
this.cfgs = cfgs;
+
+ metasPerNode.computeIfAbsent(locNodeId, id -> new
ArrayList<>()).addAll(locMetas);
+ }
+
+ /**
+ * @return Required baseline nodeIds that must be alive to complete
restore operation.
+ */
+ public Collection<UUID> nodes() {
+ return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
+ }
+ }
+
+ /** */
+ private static class RestoreCacheStartException extends
IgniteCheckedException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** @param cause Error. */
+ public RestoreCacheStartException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ /** Snapshot operation prepare response. */
+ private static class SnapshotRestoreOperationResponse implements
Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache configurations on local node. */
+ private ArrayList<StoredCacheData> ccfgs;
Review comment:
`ArrayList` -> `List`, final
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -315,28 +327,44 @@ protected void cleanup() throws IgniteCheckedException {
}
/**
- * Check if the cache or group with the specified name is currently being
restored from the snapshot.
- *
- * @param cacheName Cache name.
- * @param grpName Cache group name.
+ * @param opCtx Restoring context.
+ * @return The request id of restoring snapshot operation.
+ */
+ private @Nullable UUID restoringId(@Nullable SnapshotRestoreContext opCtx)
{
Review comment:
Not used
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1440,13 +1504,46 @@ public static boolean
isSnapshotOperation(DiscoveryEvent evt) {
}
}
+ /**
+ * @param parts Collection of pairs group and appropriate cache partition
to be snapshot.
+ * @param rmtNodeId The remote node to connect to.
+ * @param partHnd Received partition handler.
+ */
+ public IgniteInternalFuture<Void> requestRemoteSnapshot(
Review comment:
I don't like naming. This method does not request to make a remote
snapshot, it requests remote snapshot files (already existing).
RemoteSnapshotHandler does not handle remote snapshots, it receives remote
snapshot files. SnapshotRequestMessage does not request to make a snapshot, it
requests already existing files. SnapshotResponseMessage send only in case of
error when sending files, it's better to have this information in the name
(SnapshotFilesFailureResponse for example). Etc.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
##########
@@ -3925,16 +3925,16 @@ private void finishExchangeOnCoordinator(@Nullable
Collection<ClusterNode> sndRe
if (discoveryCustomMessage instanceof DynamicCacheChangeBatch)
{
if (exchActions != null) {
- Set<String> caches =
exchActions.cachesToResetLostPartitions();
+ Set<String> caches =
exchActions.cachesToResetLostPartitions();
Review comment:
Wrong indent. Nothing changed in this class except indents.
##########
File path:
modules/core/src/test/java/org/apache/ignite/platform/PlatformAddArgEntryProcessorBinarizable.java
##########
@@ -19,7 +19,6 @@
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
-
Review comment:
Not related to this fix
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1579,52 +1676,41 @@ SnapshotFutureTask registerSnapshotTask(
boolean withMetaStorage,
SnapshotSender snpSndr
) {
+ return (SnapshotFutureTask)registerSnapshotTask(snpName, new
SnapshotFutureTask(cctx, srcNodeId, snpName, tmpWorkDir,
Review comment:
Method returns `SnapshotFinishedFutureTask` in some cases, but result
force casted to `SnapshotFutureTask`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -1579,52 +1676,41 @@ SnapshotFutureTask registerSnapshotTask(
boolean withMetaStorage,
SnapshotSender snpSndr
) {
+ return (SnapshotFutureTask)registerSnapshotTask(snpName, new
SnapshotFutureTask(cctx, srcNodeId, snpName, tmpWorkDir,
+ ioFactory, snpSndr, parts, withMetaStorage, locBuff));
+ }
+
+ /**
+ * @param task Snapshot operation task to be executed.
+ * @return Snapshot operation task which should be registered on
checkpoint to run.
+ */
+ private AbstractSnapshotFutureTask<?> registerSnapshotTask(String rqId,
AbstractSnapshotFutureTask<?> task) {
if (!busyLock.enterBusy()) {
- return new SnapshotFutureTask(
- new IgniteCheckedException("Snapshot manager is stopping
[locNodeId=" + cctx.localNodeId() + ']'));
+ return new SnapshotFinishedFutureTask(new
IgniteCheckedException("Snapshot manager is stopping [locNodeId=" +
+ cctx.localNodeId() + ']'));
}
try {
- if (locSnpTasks.containsKey(snpName))
- return new SnapshotFutureTask(new
IgniteCheckedException("Snapshot with requested name is already scheduled: " +
snpName));
-
- SnapshotFutureTask snpFutTask;
+ if (locSnpTasks.containsKey(rqId)) {
+ return new SnapshotFinishedFutureTask(new
IgniteCheckedException("Snapshot with requested name is already scheduled: " +
+ rqId));
+ }
- SnapshotFutureTask prev = locSnpTasks.putIfAbsent(snpName,
- snpFutTask = new SnapshotFutureTask(cctx,
- srcNodeId,
- snpName,
- tmpWorkDir,
- ioFactory,
- snpSndr,
- parts,
- withMetaStorage,
- locBuff));
+ AbstractSnapshotFutureTask<?> prev = locSnpTasks.putIfAbsent(rqId,
task);
if (prev != null)
- return new SnapshotFutureTask(new
IgniteCheckedException("Snapshot with requested name is already scheduled: " +
snpName));
-
- if (!withMetaStorage) {
Review comment:
Why this check was removed?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2235,609 @@ private boolean readPageFromStore(long pageId,
ByteBuffer buff) throws IgniteChe
}
}
+ /** Remote snapshot future which tracks remote snapshot transmission
result. */
+ private static class RemoteSnapshotHandler extends GridFutureAdapter<Void>
implements Runnable {
+ /** Snapshot name to create. */
+ private final String reqId = RMT_SNAPSHOT_PREFIX +
U.maskForFileName(UUID.randomUUID().toString());
+
+ /** Ignite snapshot manager. */
+ private final IgniteSnapshotManager snpMgr;
+
+ /** Initial message to send request. */
+ private final SnapshotRequestMessage initMsg;
+
+ /** Remote node id to request snapshot from. */
+ private final UUID rmtNodeId;
+
+ /** Process interrupt checker. */
+ private final BooleanSupplier stopChecker;
+
+ /** Partition handler given by request initiator. */
+ private final BiConsumer<File, Throwable> partHnd;
+
+ /** Temporary working directory for consuming partitions. */
+ private final Path dir;
+
+ /** Counter which show how many partitions left to be received. */
+ private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+ /**
+ * @param snpMgr Ignite snapshot manager.
+ * @param rmtNodeId Remote node to request snapshot from.
+ * @param snpName Snapshot name to request.
+ * @param parts Cache group and partitions to request.
+ * @param stopChecker Process interrupt checker.
+ * @param partHnd Partition handler.
+ */
+ public RemoteSnapshotHandler(
+ IgniteSnapshotManager snpMgr,
+ UUID rmtNodeId,
+ String snpName,
+ Map<Integer, Set<Integer>> parts,
+ BooleanSupplier stopChecker,
+ BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+ ) {
+ dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+ initMsg = new SnapshotRequestMessage(reqId, snpName, parts);
+
+ this.snpMgr = snpMgr;
+ this.rmtNodeId = rmtNodeId;
+ this.stopChecker = stopChecker;
+ this.partHnd = partHnd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void run() {
+ if (isDone())
+ return;
+
+ try {
+ ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+ if (rmtNode == null) {
+ throw new ClusterTopologyCheckedException("Snapshot remote
request cannot be performed. " +
+ "Remote node left the grid [rmtNodeId=" + rmtNodeId +
']');
+ }
+
+ snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+ DFLT_INITIAL_SNAPSHOT_TOPIC,
+ initMsg,
+ SYSTEM_POOL,
+ Long.MAX_VALUE,
+ true);
+
+ if (snpMgr.log.isInfoEnabled()) {
+ snpMgr.log.info("Snapshot request is sent to the remote
node [rmtNodeId=" + rmtNodeId +
+ ", snpName=" + initMsg.snapshotName() + ", rqId=" +
reqId + ']');
+ }
+ }
+ catch (Throwable t) {
+ onDone(t);
+ }
+ }
+
+ /**
+ * @param ex Exception occurred during receiving files.
+ */
+ public synchronized void acceptException(Throwable ex) {
+ if (isDone())
+ return;
+
+ try {
+ partHnd.accept(null, ex);
+ }
+ catch (Throwable t) {
+ ex.addSuppressed(t);
+ }
+
+ onDone(ex);
+ }
+
+ /**
+ * @param part Received file which needs to be handled.
+ */
+ public synchronized void acceptFile(File part) {
+ if (isDone())
+ return;
+
+ if (stopChecker.getAsBoolean())
+ throw new TransmissionCancelledException("Future cancelled
prior to the all requested partitions processed.");
+
+ partHnd.accept(part, null);
+ partsLeft.decrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected synchronized boolean onDone(@Nullable Void res,
@Nullable Throwable err, boolean cancel) {
+ U.delete(dir);
+
+ return super.onDone(res, err, cancel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ RemoteSnapshotHandler future = (RemoteSnapshotHandler)o;
+
+ return Objects.equals(reqId, future.reqId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return reqId.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RemoteSnapshotHandler.class, this);
+ }
+ }
+
+ /**
+ * This manager is responsible for requesting and handling snapshots from
a remote node. Each snapshot request
+ * processed asynchronously but strictly one by one.
+ */
+ private class SequentialRemoteSnapshotManager implements
TransmissionHandler, GridMessageListener {
+ /** A task currently being executed and must be explicitly finished. */
+ private volatile RemoteSnapshotHandler active;
+
+ /** Queue of asynchronous tasks to execute. */
+ private final Queue<RemoteSnapshotHandler> queue = new
ConcurrentLinkedDeque<>();
+
+ /** {@code true} if the node is stopping. */
+ private volatile boolean stopping;
+
+ /**
+ * @param next New task for scheduling. May produce <tt>null</tt>
value if the queue is empty.
+ */
+ public synchronized void submit(@Nullable RemoteSnapshotHandler next) {
+ RemoteSnapshotHandler curr = active;
+
+ if (curr == null || curr.isDone()) {
+ if (next == null)
+ return;
+
+ next.listen(f -> submit(queue.poll()));
Review comment:
The race is still here, when task is done it first poll the queue (get
null result) and then pass it to sync submit method, at the same time another
thread can be inside sync submit method before offer next tast to the queue.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -787,30 +711,351 @@ private void finishPrepare(UUID reqId, Map<UUID,
ArrayList<StoredCacheData>> res
}
if (failure == null)
- failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+ failure = checkNodeLeft(opCtx0.nodes(), res.keySet());
// Context has been created - should rollback changes cluster-wide.
if (failure != null) {
- opCtx0.err.compareAndSet(null, failure);
-
- if (U.isLocalNodeCoordinator(ctx.discovery()))
- rollbackRestoreProc.start(reqId, reqId);
+ opCtx0.errHnd.accept(failure);
return;
}
Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
- for (List<StoredCacheData> storedCfgs : res.values()) {
- if (storedCfgs == null)
- continue;
+ for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e :
res.entrySet()) {
+ if (e.getValue().ccfgs != null) {
+ for (StoredCacheData cacheData : e.getValue().ccfgs) {
+ globalCfgs.put(CU.cacheId(cacheData.config().getName()),
cacheData);
- for (StoredCacheData cacheData : storedCfgs)
- globalCfgs.put(CU.cacheId(cacheData.config().getName()),
cacheData);
+
opCtx0.dirs.add(((FilePageStoreManager)ctx.cache().context().pageStore()).cacheWorkDir(cacheData.config()));
+ }
+ }
+
+ opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new
ArrayList<>())
+ .addAll(e.getValue().metas);
}
opCtx0.cfgs = globalCfgs;
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ preloadProc.start(reqId, reqId);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @param cache Discovery cache.
+ * @return Map of affinity per each cache group.
+ */
+ private static GridAffinityAssignmentCache calculateAffinity(
+ GridKernalContext ctx,
+ CacheConfiguration<?, ?> ccfg,
+ DiscoCache cache
+ ) {
+ GridAffinityAssignmentCache affCache =
GridAffinityAssignmentCache.create(ctx, ccfg.getAffinity(), ccfg);
+
+ affCache.calculate(cache.version(), null, cache);
+
+ return affCache;
+ }
+
+ /**
+ * @param metas List of snapshot metadata to check.
+ * @param grpId Group id.
+ * @param parts Set of partitions to search for.
+ * @return Snapshot metadata which contains a full set of given partitions
or {@code null} the otherwise.
+ */
+ private static @Nullable SnapshotMetadata findMetadataWithSamePartitions(
+ List<SnapshotMetadata> metas,
+ int grpId,
+ Set<Integer> parts
+ ) {
+ assert !F.isEmpty(parts) && !parts.contains(INDEX_PARTITION) : parts;
+
+ // Try to copy everything right from the single snapshot part.
+ for (SnapshotMetadata meta : metas) {
+ Set<Integer> grpParts = meta.partitions().get(grpId);
+ Set<Integer> grpWoIndex = grpParts == null ?
Collections.emptySet() : new HashSet<>(grpParts);
+
+ grpWoIndex.remove(INDEX_PARTITION);
+
+ if (grpWoIndex.equals(parts))
+ return meta;
+ }
+
+ return null;
+ }
+
+ /**
+ * @param reqId Request id.
+ * @return Future which will be completed when the preload ends.
+ */
+ private IgniteInternalFuture<Boolean> preload(UUID reqId) {
+ if (ctx.clientNode())
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+ GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+ if (opCtx0 == null)
+ return new GridFinishedFuture<>(new
IgniteCheckedException("Snapshot restore process has incorrect restore state: "
+ reqId));
+
+ if (opCtx0.dirs.isEmpty())
+ return new GridFinishedFuture<>();
+
+ try {
+ if (ctx.isStopping())
+ throw new NodeStoppingException("Node is stopping: " +
ctx.localNodeId());
+
+ IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
+
+ synchronized (this) {
+ opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f ->
null));
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting snapshot preload operation to restore cache
groups" +
+ "[snapshot=" + opCtx0.snpName +
+ ", caches=" + F.transform(opCtx0.dirs, File::getName) +
']');
+ }
+
+ CompletableFuture<Void> metaFut =
ctx.localNodeId().equals(opCtx0.opNodeId) ?
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ SnapshotMetadata meta =
F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
+
+ File binDir =
binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+ meta.folderName());
+
+ ctx.cacheObjects().updateMetadata(binDir,
opCtx0.stopChecker);
+ }
+ catch (Throwable t) {
+ log.error("Unable to perform metadata update
operation for the cache groups restore process", t);
+
+ opCtx0.errHnd.accept(t);
+ }
+ }, snpMgr.snapshotExecutorService()) :
CompletableFuture.completedFuture(null);
+
+ for (StoredCacheData data : opCtx0.cfgs.values()) {
+
opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+ grp -> calculateAffinity(ctx, data.config(),
opCtx0.discoCache));
+ }
+
+ // First preload everything from the local node.
+ List<SnapshotMetadata> locMetas =
opCtx0.metasPerNode.get(ctx.localNodeId());
+
+ Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new
HashMap<>();
+ ClusterNode locNode = ctx.cache().context().localNode();
+
+ for (File dir : opCtx0.dirs) {
+ String cacheOrGrpName = cacheGroupName(dir);
+ int grpId = CU.cacheId(cacheOrGrpName);
+
+ File tmpCacheDir = formatTmpDirName(dir);
+ tmpCacheDir.mkdir();
+
+ Set<PartitionRestoreFuture> leftParts;
+
+ opCtx0.locProgress.put(grpId,
+
nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode,
PartitionRestoreFuture::new));
+
+ rmtLoadParts.put(grpId, leftParts = new
HashSet<>(opCtx0.locProgress.get(grpId)));
+
+ if (leftParts.isEmpty())
+ continue;
+
+ SnapshotMetadata full =
findMetadataWithSamePartitions(locMetas,
+ grpId,
+ leftParts.stream().map(p ->
p.partId).collect(Collectors.toSet()));
+
+ for (SnapshotMetadata meta : full == null ? locMetas :
Collections.singleton(full)) {
+ if (leftParts.isEmpty())
+ break;
+
+ File snpCacheDir = new
File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx.snpName),
+ Paths.get(databaseRelativePath(meta.folderName()),
dir.getName()).toString());
+
+ leftParts.removeIf(partFut -> {
+ boolean doCopy =
ofNullable(meta.partitions().get(grpId))
+ .orElse(Collections.emptySet())
+ .contains(partFut.partId);
+
+ if (doCopy) {
+ copyLocalAsync(ctx.cache().context().snapshotMgr(),
+ opCtx,
+ snpCacheDir,
+ tmpCacheDir,
+ partFut);
+ }
+
+ return doCopy;
+ });
+
+ if (meta == full) {
+ assert leftParts.isEmpty() : leftParts;
+
+ if (log.isInfoEnabled()) {
+ log.info("The snapshot was taken on the same
cluster topology. The index will be copied to " +
+ "restoring cache group if necessary [snpName="
+ opCtx0.snpName + ", dir=" + dir.getName() + ']');
+ }
+
+ File idxFile = new File(snpCacheDir,
FilePageStoreManager.getPartitionFileName(INDEX_PARTITION));
+
+ if (idxFile.exists()) {
+ PartitionRestoreFuture idxFut;
+
+ opCtx0.locProgress.computeIfAbsent(grpId, g -> new
HashSet<>())
+ .add(idxFut = new
PartitionRestoreFuture(INDEX_PARTITION));
+
+ copyLocalAsync(ctx.cache().context().snapshotMgr(),
+ opCtx,
+ snpCacheDir,
+ tmpCacheDir,
+ idxFut);
+ }
+ }
+ }
+ }
+
+ // Load other partitions from remote nodes.
+ // This is necessary for sending only one partitions request per
each cluster node.
+ Map<UUID, Map<Integer, Set<Integer>>> snpAff =
snapshotAffinity(opCtx0.metasPerNode,
+ (grpId, partId) -> rmtLoadParts.get(grpId) != null &&
+ rmtLoadParts.get(grpId).contains(new
PartitionRestoreFuture(partId)));
+ Map<Integer, File> grpToDir = opCtx0.dirs.stream()
+ .collect(Collectors.toMap(d ->
CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
+ d -> d));
+
+ try {
+ if (log.isInfoEnabled() && !snpAff.isEmpty()) {
+ log.info("Trying to request partitions from remote nodes" +
+ "[snapshot=" + opCtx0.snpName +
+ ", map=" +
snpAff.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+ e -> partitionsMapToCompactString(e.getValue()))) +
']');
+ }
+
+ for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m :
snpAff.entrySet()) {
+ if (m.getKey().equals(ctx.localNodeId()))
+ continue;
Review comment:
Is it possible? Looks like only remote nodes should be in snpAff
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -787,30 +711,351 @@ private void finishPrepare(UUID reqId, Map<UUID,
ArrayList<StoredCacheData>> res
}
if (failure == null)
- failure = checkNodeLeft(opCtx0.nodes, res.keySet());
+ failure = checkNodeLeft(opCtx0.nodes(), res.keySet());
// Context has been created - should rollback changes cluster-wide.
if (failure != null) {
- opCtx0.err.compareAndSet(null, failure);
-
- if (U.isLocalNodeCoordinator(ctx.discovery()))
- rollbackRestoreProc.start(reqId, reqId);
+ opCtx0.errHnd.accept(failure);
return;
}
Map<Integer, StoredCacheData> globalCfgs = new HashMap<>();
- for (List<StoredCacheData> storedCfgs : res.values()) {
- if (storedCfgs == null)
- continue;
+ for (Map.Entry<UUID, SnapshotRestoreOperationResponse> e :
res.entrySet()) {
+ if (e.getValue().ccfgs != null) {
+ for (StoredCacheData cacheData : e.getValue().ccfgs) {
+ globalCfgs.put(CU.cacheId(cacheData.config().getName()),
cacheData);
- for (StoredCacheData cacheData : storedCfgs)
- globalCfgs.put(CU.cacheId(cacheData.config().getName()),
cacheData);
+
opCtx0.dirs.add(((FilePageStoreManager)ctx.cache().context().pageStore()).cacheWorkDir(cacheData.config()));
+ }
+ }
+
+ opCtx0.metasPerNode.computeIfAbsent(e.getKey(), id -> new
ArrayList<>())
+ .addAll(e.getValue().metas);
}
opCtx0.cfgs = globalCfgs;
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ preloadProc.start(reqId, reqId);
+ }
+
+ /**
+ * @param ccfg Cache configuration.
+ * @param cache Discovery cache.
+ * @return Map of affinity per each cache group.
+ */
+ private static GridAffinityAssignmentCache calculateAffinity(
+ GridKernalContext ctx,
+ CacheConfiguration<?, ?> ccfg,
+ DiscoCache cache
+ ) {
+ GridAffinityAssignmentCache affCache =
GridAffinityAssignmentCache.create(ctx, ccfg.getAffinity(), ccfg);
+
+ affCache.calculate(cache.version(), null, cache);
+
+ return affCache;
+ }
+
+ /**
+ * @param metas List of snapshot metadata to check.
+ * @param grpId Group id.
+ * @param parts Set of partitions to search for.
+ * @return Snapshot metadata which contains a full set of given partitions
or {@code null} the otherwise.
+ */
+ private static @Nullable SnapshotMetadata findMetadataWithSamePartitions(
+ List<SnapshotMetadata> metas,
+ int grpId,
+ Set<Integer> parts
+ ) {
+ assert !F.isEmpty(parts) && !parts.contains(INDEX_PARTITION) : parts;
+
+ // Try to copy everything right from the single snapshot part.
+ for (SnapshotMetadata meta : metas) {
+ Set<Integer> grpParts = meta.partitions().get(grpId);
+ Set<Integer> grpWoIndex = grpParts == null ?
Collections.emptySet() : new HashSet<>(grpParts);
+
+ grpWoIndex.remove(INDEX_PARTITION);
+
+ if (grpWoIndex.equals(parts))
+ return meta;
+ }
+
+ return null;
+ }
+
+ /**
+ * @param reqId Request id.
+ * @return Future which will be completed when the preload ends.
+ */
+ private IgniteInternalFuture<Boolean> preload(UUID reqId) {
+ if (ctx.clientNode())
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+ GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
+
+ if (opCtx0 == null)
+ return new GridFinishedFuture<>(new
IgniteCheckedException("Snapshot restore process has incorrect restore state: "
+ reqId));
+
+ if (opCtx0.dirs.isEmpty())
+ return new GridFinishedFuture<>();
+
+ try {
+ if (ctx.isStopping())
+ throw new NodeStoppingException("Node is stopping: " +
ctx.localNodeId());
+
+ IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
+
+ synchronized (this) {
+ opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f ->
null));
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting snapshot preload operation to restore cache
groups" +
+ "[snapshot=" + opCtx0.snpName +
+ ", caches=" + F.transform(opCtx0.dirs, File::getName) +
']');
+ }
+
+ CompletableFuture<Void> metaFut =
ctx.localNodeId().equals(opCtx0.opNodeId) ?
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ SnapshotMetadata meta =
F.first(opCtx0.metasPerNode.get(opCtx0.opNodeId));
+
+ File binDir =
binaryWorkDir(snpMgr.snapshotLocalDir(opCtx0.snpName).getAbsolutePath(),
+ meta.folderName());
+
+ ctx.cacheObjects().updateMetadata(binDir,
opCtx0.stopChecker);
+ }
+ catch (Throwable t) {
+ log.error("Unable to perform metadata update
operation for the cache groups restore process", t);
+
+ opCtx0.errHnd.accept(t);
+ }
+ }, snpMgr.snapshotExecutorService()) :
CompletableFuture.completedFuture(null);
+
+ for (StoredCacheData data : opCtx0.cfgs.values()) {
+
opCtx0.affCache.computeIfAbsent(CU.cacheOrGroupName(data.config()),
+ grp -> calculateAffinity(ctx, data.config(),
opCtx0.discoCache));
+ }
+
+ // First preload everything from the local node.
+ List<SnapshotMetadata> locMetas =
opCtx0.metasPerNode.get(ctx.localNodeId());
+
+ Map<Integer, Set<PartitionRestoreFuture>> rmtLoadParts = new
HashMap<>();
+ ClusterNode locNode = ctx.cache().context().localNode();
+
+ for (File dir : opCtx0.dirs) {
+ String cacheOrGrpName = cacheGroupName(dir);
+ int grpId = CU.cacheId(cacheOrGrpName);
+
+ File tmpCacheDir = formatTmpDirName(dir);
+ tmpCacheDir.mkdir();
+
+ Set<PartitionRestoreFuture> leftParts;
+
+ opCtx0.locProgress.put(grpId,
+
nodeAffinityPartitions(opCtx0.affCache.get(cacheOrGrpName), locNode,
PartitionRestoreFuture::new));
+
+ rmtLoadParts.put(grpId, leftParts = new
HashSet<>(opCtx0.locProgress.get(grpId)));
+
+ if (leftParts.isEmpty())
+ continue;
+
+ SnapshotMetadata full =
findMetadataWithSamePartitions(locMetas,
+ grpId,
+ leftParts.stream().map(p ->
p.partId).collect(Collectors.toSet()));
+
+ for (SnapshotMetadata meta : full == null ? locMetas :
Collections.singleton(full)) {
+ if (leftParts.isEmpty())
+ break;
+
+ File snpCacheDir = new
File(ctx.cache().context().snapshotMgr().snapshotLocalDir(opCtx.snpName),
+ Paths.get(databaseRelativePath(meta.folderName()),
dir.getName()).toString());
+
+ leftParts.removeIf(partFut -> {
+ boolean doCopy =
ofNullable(meta.partitions().get(grpId))
+ .orElse(Collections.emptySet())
+ .contains(partFut.partId);
+
+ if (doCopy) {
+ copyLocalAsync(ctx.cache().context().snapshotMgr(),
+ opCtx,
+ snpCacheDir,
+ tmpCacheDir,
+ partFut);
+ }
+
+ return doCopy;
+ });
+
+ if (meta == full) {
+ assert leftParts.isEmpty() : leftParts;
+
+ if (log.isInfoEnabled()) {
+ log.info("The snapshot was taken on the same
cluster topology. The index will be copied to " +
+ "restoring cache group if necessary [snpName="
+ opCtx0.snpName + ", dir=" + dir.getName() + ']');
+ }
+
+ File idxFile = new File(snpCacheDir,
FilePageStoreManager.getPartitionFileName(INDEX_PARTITION));
+
+ if (idxFile.exists()) {
+ PartitionRestoreFuture idxFut;
+
+ opCtx0.locProgress.computeIfAbsent(grpId, g -> new
HashSet<>())
+ .add(idxFut = new
PartitionRestoreFuture(INDEX_PARTITION));
+
+ copyLocalAsync(ctx.cache().context().snapshotMgr(),
+ opCtx,
+ snpCacheDir,
+ tmpCacheDir,
+ idxFut);
+ }
+ }
+ }
+ }
+
+ // Load other partitions from remote nodes.
+ // This is necessary for sending only one partitions request per
each cluster node.
+ Map<UUID, Map<Integer, Set<Integer>>> snpAff =
snapshotAffinity(opCtx0.metasPerNode,
+ (grpId, partId) -> rmtLoadParts.get(grpId) != null &&
+ rmtLoadParts.get(grpId).contains(new
PartitionRestoreFuture(partId)));
+ Map<Integer, File> grpToDir = opCtx0.dirs.stream()
+ .collect(Collectors.toMap(d ->
CU.cacheId(FilePageStoreManager.cacheGroupName(d)),
+ d -> d));
+
+ try {
+ if (log.isInfoEnabled() && !snpAff.isEmpty()) {
+ log.info("Trying to request partitions from remote nodes" +
+ "[snapshot=" + opCtx0.snpName +
+ ", map=" +
snpAff.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+ e -> partitionsMapToCompactString(e.getValue()))) +
']');
+ }
+
+ for (Map.Entry<UUID, Map<Integer, Set<Integer>>> m :
snpAff.entrySet()) {
Review comment:
As far as I understand snpAff contains required by current node remote
partitions. But here can be duplicates (two nodes can contain the same
partitions), and looks like if two nodes contain the same partitions we request
it from both nodes.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -990,15 +1369,43 @@ private void finishRollback(UUID reqId, Map<UUID,
Boolean> res, Map<UUID, Except
/** Snapshot name. */
private final String snpName;
- /** Baseline node IDs that must be alive to complete the operation. */
- private final Set<UUID> nodes;
+ /** Baseline discovery cache for node IDs that must be alive to
complete the operation.*/
+ private final DiscoCache discoCache;
- /** List of restored cache group directories. */
- private final Collection<File> dirs;
+ /** Operational node id. */
+ private final UUID opNodeId;
+
+ /**
+ * Set of restored cache groups path on local node. Collected when all
cache configurations received
+ * from the <tt>prepare</tt> distributed process.
+ */
+ private final Set<File> dirs = new HashSet<>();
/** The exception that led to the interruption of the process. */
private final AtomicReference<Throwable> err = new AtomicReference<>();
+ /** Distribution of snapshot metadata files across the cluster. */
+ private final Map<UUID, ArrayList<SnapshotMetadata>> metasPerNode =
new HashMap<>();
Review comment:
`ArrayList` -> `List`?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -872,12 +1118,49 @@ private void finishCacheStart(UUID reqId, Map<UUID,
Boolean> res, Map<UUID, Exce
rollbackRestoreProc.start(reqId, reqId);
}
+ /**
+ * @param metas Map of snapshot metadata distribution across the cluster.
+ * @return Map of cache partitions per each node.
+ */
+ private static Map<UUID, Map<Integer, Set<Integer>>> snapshotAffinity(
+ Map<UUID, ArrayList<SnapshotMetadata>> metas,
+ BiPredicate<Integer, Integer> filter
+ ) {
+ Map<UUID, Map<Integer, Set<Integer>>> nodeToSnp = new HashMap<>();
+
+ for (Map.Entry<UUID, ArrayList<SnapshotMetadata>> e :
metas.entrySet()) {
+ UUID nodeId = e.getKey();
+
+ for (SnapshotMetadata meta : ofNullable(e.getValue()).orElse(new
ArrayList<>())) {
Review comment:
`new ArrayList<>()` -> `Collections.emptyList()`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
##########
@@ -2115,6 +2235,609 @@ private boolean readPageFromStore(long pageId,
ByteBuffer buff) throws IgniteChe
}
}
+ /** Remote snapshot future which tracks remote snapshot transmission
result. */
+ private static class RemoteSnapshotHandler extends GridFutureAdapter<Void>
implements Runnable {
+ /** Snapshot name to create. */
+ private final String reqId = RMT_SNAPSHOT_PREFIX +
U.maskForFileName(UUID.randomUUID().toString());
+
+ /** Ignite snapshot manager. */
+ private final IgniteSnapshotManager snpMgr;
+
+ /** Initial message to send request. */
+ private final SnapshotRequestMessage initMsg;
+
+ /** Remote node id to request snapshot from. */
+ private final UUID rmtNodeId;
+
+ /** Process interrupt checker. */
+ private final BooleanSupplier stopChecker;
+
+ /** Partition handler given by request initiator. */
+ private final BiConsumer<File, Throwable> partHnd;
+
+ /** Temporary working directory for consuming partitions. */
+ private final Path dir;
+
+ /** Counter which show how many partitions left to be received. */
+ private final AtomicInteger partsLeft = new AtomicInteger(-1);
+
+ /**
+ * @param snpMgr Ignite snapshot manager.
+ * @param rmtNodeId Remote node to request snapshot from.
+ * @param snpName Snapshot name to request.
+ * @param parts Cache group and partitions to request.
+ * @param stopChecker Process interrupt checker.
+ * @param partHnd Partition handler.
+ */
+ public RemoteSnapshotHandler(
+ IgniteSnapshotManager snpMgr,
+ UUID rmtNodeId,
+ String snpName,
+ Map<Integer, Set<Integer>> parts,
+ BooleanSupplier stopChecker,
+ BiConsumer<@Nullable File, @Nullable Throwable> partHnd
+ ) {
+ dir = Paths.get(snpMgr.tmpWorkDir.getAbsolutePath(), reqId);
+ initMsg = new SnapshotRequestMessage(reqId, snpName, parts);
+
+ this.snpMgr = snpMgr;
+ this.rmtNodeId = rmtNodeId;
+ this.stopChecker = stopChecker;
+ this.partHnd = partHnd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void run() {
+ if (isDone())
+ return;
+
+ try {
+ ClusterNode rmtNode = snpMgr.cctx.discovery().node(rmtNodeId);
+
+ if (rmtNode == null) {
+ throw new ClusterTopologyCheckedException("Snapshot remote
request cannot be performed. " +
+ "Remote node left the grid [rmtNodeId=" + rmtNodeId +
']');
+ }
+
+ snpMgr.cctx.gridIO().sendOrderedMessage(rmtNode,
+ DFLT_INITIAL_SNAPSHOT_TOPIC,
+ initMsg,
+ SYSTEM_POOL,
+ Long.MAX_VALUE,
+ true);
+
+ if (snpMgr.log.isInfoEnabled()) {
+ snpMgr.log.info("Snapshot request is sent to the remote
node [rmtNodeId=" + rmtNodeId +
+ ", snpName=" + initMsg.snapshotName() + ", rqId=" +
reqId + ']');
+ }
+ }
+ catch (Throwable t) {
+ onDone(t);
+ }
+ }
+
+ /**
+ * @param ex Exception occurred during receiving files.
+ */
+ public synchronized void acceptException(Throwable ex) {
+ if (isDone())
+ return;
+
+ try {
+ partHnd.accept(null, ex);
+ }
+ catch (Throwable t) {
+ ex.addSuppressed(t);
+ }
+
+ onDone(ex);
+ }
+
+ /**
+ * @param part Received file which needs to be handled.
+ */
+ public synchronized void acceptFile(File part) {
+ if (isDone())
+ return;
+
+ if (stopChecker.getAsBoolean())
+ throw new TransmissionCancelledException("Future cancelled
prior to the all requested partitions processed.");
+
+ partHnd.accept(part, null);
+ partsLeft.decrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected synchronized boolean onDone(@Nullable Void res,
@Nullable Throwable err, boolean cancel) {
+ U.delete(dir);
+
+ return super.onDone(res, err, cancel);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ RemoteSnapshotHandler future = (RemoteSnapshotHandler)o;
+
+ return Objects.equals(reqId, future.reqId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return reqId.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RemoteSnapshotHandler.class, this);
+ }
+ }
+
+ /**
+ * This manager is responsible for requesting and handling snapshots from
a remote node. Each snapshot request
+ * processed asynchronously but strictly one by one.
+ */
+ private class SequentialRemoteSnapshotManager implements
TransmissionHandler, GridMessageListener {
+ /** A task currently being executed and must be explicitly finished. */
+ private volatile RemoteSnapshotHandler active;
+
+ /** Queue of asynchronous tasks to execute. */
+ private final Queue<RemoteSnapshotHandler> queue = new
ConcurrentLinkedDeque<>();
+
+ /** {@code true} if the node is stopping. */
+ private volatile boolean stopping;
+
+ /**
+ * @param next New task for scheduling. May produce <tt>null</tt>
value if the queue is empty.
+ */
+ public synchronized void submit(@Nullable RemoteSnapshotHandler next) {
+ RemoteSnapshotHandler curr = active;
+
+ if (curr == null || curr.isDone()) {
+ if (next == null)
+ return;
+
+ next.listen(f -> submit(queue.poll()));
+
+ active = next;
+
+ if (stopping)
+ next.acceptException(new
IgniteException(SNP_NODE_STOPPING_ERR_MSG));
+ else
+ next.run();
+ }
+ else if (next != null)
+ queue.offer(next);
+ }
+
+ /** Stopping handler. */
+ public void stop() {
+ stopping = true;
+
+ Set<RemoteSnapshotHandler> futs = activeTasks();
+ GridCompoundFuture<Void, Void> stopFut = new
GridCompoundFuture<>();
+
+ try {
+ for (IgniteInternalFuture<Void> fut : futs)
+ stopFut.add(fut);
+
+ stopFut.markInitialized().get();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @param nodeId A node left the cluster.
+ */
+ public void onNodeLeft(UUID nodeId) {
+ Set<RemoteSnapshotHandler> futs = activeTasks();
+ ClusterTopologyCheckedException ex = new
ClusterTopologyCheckedException("The node from which a snapshot has been " +
+ "requested left the grid");
+
+ futs.forEach(t -> {
+ if (t.rmtNodeId.equals(nodeId))
+ t.acceptException(ex);
+ });
+ }
+
+ /**
+ * @return The set of currently scheduled tasks, some of them may be
already completed.
+ */
+ private Set<RemoteSnapshotHandler> activeTasks() {
+ Set<RemoteSnapshotHandler> futs = new HashSet<>();
+
+ RemoteSnapshotHandler curr = active;
+ RemoteSnapshotHandler changed;
+
+ do {
+ futs.addAll(queue);
+ futs.add(curr);
+
+ changed = curr;
+ } while ((curr = active) != changed);
+
+ return futs.stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ if (msg instanceof SnapshotRequestMessage) {
+ SnapshotRequestMessage reqMsg0 =
(SnapshotRequestMessage)msg;
+ String rqId = reqMsg0.requestId();
+ String snpName = reqMsg0.snapshotName();
+
+ synchronized (this) {
+ AbstractSnapshotFutureTask<?> task =
lastScheduledSnapshotResponseRemoteTask(nodeId);
+
+ if (task != null) {
+ // Task will also be removed from local map due to
the listener on future done.
+ task.cancel();
+
+ log.info("Snapshot request has been cancelled due
to another request received " +
+ "[prevSnpResp=" + task + ", msg0=" + reqMsg0 +
']');
+ }
+ }
+
+ AbstractSnapshotFutureTask<?> task =
registerSnapshotTask(rqId,
+ new SnapshotResponseRemoteFutureTask(cctx,
+ nodeId,
+ snpName,
+ tmpWorkDir,
+ ioFactory,
+ rmtSndrFactory.apply(rqId, nodeId),
+ reqMsg0.parts()));
+
+ task.listen(f -> {
+ if (f.error() == null)
+ return;
+
+ U.error(log, "Failed to process request of creating a
snapshot " +
+ "[from=" + nodeId + ", msg=" + reqMsg0 + ']',
f.error());
+
+ try {
+ cctx.gridIO().sendToCustomTopic(nodeId,
+ DFLT_INITIAL_SNAPSHOT_TOPIC,
+ new
SnapshotResponseMessage(reqMsg0.requestId(), f.error().getMessage()),
+ SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException ex0) {
+ U.error(log, "Fail to send the response message
with processing snapshot request " +
+ "error [request=" + reqMsg0 + ", nodeId=" +
nodeId + ']', ex0);
+ }
+ });
+
+ task.start();
+ }
+ else if (msg instanceof SnapshotResponseMessage) {
+ SnapshotResponseMessage respMsg0 =
(SnapshotResponseMessage)msg;
+
+ RemoteSnapshotHandler task = active;
+
+ if (task == null ||
!task.reqId.equals(respMsg0.requestId())) {
+ if (log.isInfoEnabled()) {
+ log.info("A stale snapshot response message has
been received. Will be ignored " +
+ "[fromNodeId=" + nodeId + ", response=" +
respMsg0 + ']');
+ }
+
+ return;
+ }
+
+ if (respMsg0.errorMessage() != null) {
+ task.acceptException(new
IgniteCheckedException("Request cancelled. The snapshot operation stopped " +
+ "on the remote node with an error: " +
respMsg0.errorMessage()));
+ }
+ }
+ }
+ catch (Throwable e) {
+ U.error(log, "Processing snapshot request from remote node
fails with an error", e);
+
+ cctx.kernalContext().failure().process(new
FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onEnd(UUID nodeId) {
+ RemoteSnapshotHandler task = active;
+
+ if (task == null)
+ return;
+
+ assert task.partsLeft.get() == 0 : task;
+ assert task.rmtNodeId.equals(nodeId);
+
+ log.info("Requested snapshot from remote node has been fully
received " +
Review comment:
isInfoEnabled
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -907,8 +1190,9 @@ private Exception checkNodeLeft(Set<UUID> reqNodes,
Set<UUID> respNodes) {
synchronized (this) {
opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+ }
- try {
+ try {
ctx.cache().context().snapshotMgr().snapshotExecutorService().execute(() -> {
Review comment:
Wrong indent
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -1007,17 +1414,96 @@ private void finishRollback(UUID reqId, Map<UUID,
Boolean> res, Map<UUID, Except
/**
* @param req Request to prepare cache group restore from the snapshot.
- * @param dirs List of cache group names to restore from the snapshot.
* @param cfgs Cache ID to configuration mapping.
*/
- protected SnapshotRestoreContext(SnapshotOperationRequest req,
Collection<File> dirs,
- Map<Integer, StoredCacheData> cfgs) {
+ protected SnapshotRestoreContext(
+ SnapshotOperationRequest req,
+ DiscoCache discoCache,
+ Map<Integer, StoredCacheData> cfgs,
+ UUID locNodeId,
+ List<SnapshotMetadata> locMetas
+ ) {
reqId = req.requestId();
snpName = req.snapshotName();
- nodes = new HashSet<>(req.nodes());
+ opNodeId = req.operationalNodeId();
+ this.discoCache = discoCache;
- this.dirs = dirs;
this.cfgs = cfgs;
+
+ metasPerNode.computeIfAbsent(locNodeId, id -> new
ArrayList<>()).addAll(locMetas);
+ }
+
+ /**
+ * @return Required baseline nodeIds that must be alive to complete
restore operation.
+ */
+ public Collection<UUID> nodes() {
+ return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
+ }
+ }
+
+ /** */
+ private static class RestoreCacheStartException extends
IgniteCheckedException {
Review comment:
Not used
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -980,6 +1263,102 @@ private void finishRollback(UUID reqId, Map<UUID,
Boolean> res, Map<UUID, Except
finishProcess(reqId, opCtx0.err.get());
}
+ /**
+ * @param mgr Ignite snapshot manager.
+ * @param opCtx Snapshot operation context.
+ * @param srcDir Snapshot directory to copy from.
+ * @param targetDir Destination directory to copy to.
+ */
+ private static void copyLocalAsync(
+ IgniteSnapshotManager mgr,
+ SnapshotRestoreContext opCtx,
+ File srcDir,
+ File targetDir,
+ PartitionRestoreFuture partFut
+ ) {
+ File snpFile = new File(srcDir,
FilePageStoreManager.getPartitionFileName(partFut.partId));
+ Path partFile = Paths.get(targetDir.getAbsolutePath(),
FilePageStoreManager.getPartitionFileName(partFut.partId));
+
+ CompletableFuture.supplyAsync(() -> {
+ if (opCtx.stopChecker.getAsBoolean())
+ throw new IgniteInterruptedException("The operation has
been stopped on copy file: " + snpFile.getAbsolutePath());
+
+ if (Thread.interrupted())
+ throw new IgniteInterruptedException("Thread has been
interrupted: " + Thread.currentThread().getName());
+
+ if (!snpFile.exists()) {
+ throw new IgniteException("Partition snapshot file doesn't
exist [snpName=" + opCtx.snpName +
+ ", snpDir=" + snpFile.getAbsolutePath() + ", name=" +
snpFile.getName() + ']');
+ }
+
+ IgniteSnapshotManager.copy(mgr.ioFactory(), snpFile,
partFile.toFile(), snpFile.length());
+
+ return partFile;
+ }, mgr.snapshotExecutorService())
+ .whenComplete((r, t) -> opCtx.errHnd.accept(t))
+ .whenComplete((r, t) -> {
+ if (t == null)
+ partFut.complete(partFile);
+ else
+ partFut.completeExceptionally(t);
+ });
+ }
+
+ /**
+ * @param affCache Affinity cache.
+ * @param node Cluster node to get assigned partitions.
+ * @return The set of partitions assigned to the given node.
+ */
+ private static <T> Set<T> nodeAffinityPartitions(
+ GridAffinityAssignmentCache affCache,
+ ClusterNode node,
+ IntFunction<T> factory
+ ) {
+ return IntStream.range(0, affCache.partitions())
+ .filter(p ->
affCache.idealAssignment().assignment().get(p).contains(node))
+ .mapToObj(factory)
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * @param col Collection of sets to complete.
+ * @param ex Exception to set.
+ */
+ private static void
completeListExceptionally(Collection<Set<PartitionRestoreFuture>> col,
Throwable ex) {
+ col.stream()
+ .flatMap(Collection::stream)
+ .forEach(f -> f.completeExceptionally(ex));
+ }
+
+ /**
+ * @param cacheStartFut The cache started future to wrap exception if need.
+ * @param <T> Result future type.
+ * @return Future which completes with wrapped exception if it occurred.
+ */
+ private static <T> IgniteInternalFuture<T>
chainCacheStartException(IgniteInternalFuture<T> cacheStartFut) {
Review comment:
Not used
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
##########
@@ -1007,17 +1414,96 @@ private void finishRollback(UUID reqId, Map<UUID,
Boolean> res, Map<UUID, Except
/**
* @param req Request to prepare cache group restore from the snapshot.
- * @param dirs List of cache group names to restore from the snapshot.
* @param cfgs Cache ID to configuration mapping.
*/
- protected SnapshotRestoreContext(SnapshotOperationRequest req,
Collection<File> dirs,
- Map<Integer, StoredCacheData> cfgs) {
+ protected SnapshotRestoreContext(
+ SnapshotOperationRequest req,
+ DiscoCache discoCache,
+ Map<Integer, StoredCacheData> cfgs,
+ UUID locNodeId,
+ List<SnapshotMetadata> locMetas
+ ) {
reqId = req.requestId();
snpName = req.snapshotName();
- nodes = new HashSet<>(req.nodes());
+ opNodeId = req.operationalNodeId();
+ this.discoCache = discoCache;
- this.dirs = dirs;
this.cfgs = cfgs;
+
+ metasPerNode.computeIfAbsent(locNodeId, id -> new
ArrayList<>()).addAll(locMetas);
+ }
+
+ /**
+ * @return Required baseline nodeIds that must be alive to complete
restore operation.
+ */
+ public Collection<UUID> nodes() {
+ return F.transform(discoCache.aliveBaselineNodes(), F.node2id());
+ }
+ }
+
+ /** */
+ private static class RestoreCacheStartException extends
IgniteCheckedException {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** @param cause Error. */
+ public RestoreCacheStartException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ /** Snapshot operation prepare response. */
+ private static class SnapshotRestoreOperationResponse implements
Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache configurations on local node. */
+ private ArrayList<StoredCacheData> ccfgs;
+
+ /** Snapshot metadata files on local node. */
+ private ArrayList<SnapshotMetadata> metas;
Review comment:
`ArrayList` -> `List`, final
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]