nizhikov commented on code in PR #10506:
URL: https://github.com/apache/ignite/pull/10506#discussion_r1095846011
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java:
##########
@@ -1227,6 +1273,379 @@ private void finishCacheStart(UUID reqId, Map<UUID,
Boolean> res, Map<UUID, Exce
rollbackRestoreProc.start(reqId, reqId);
}
+ /**
+ * @param reqId Request ID.
+ * @return Result future.
+ */
+ private IgniteInternalFuture<Boolean> cacheStop(UUID reqId) {
+ if (!U.isLocalNodeCoordinator(ctx.discovery()))
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Collection<String> stopCaches = opCtx0.cfgs.values()
+ .stream()
+ .map(c -> c.config().getName())
+ .collect(Collectors.toSet());
+
+ if (log.isInfoEnabled())
+ log.info("Stopping caches [reqId=" + opCtx0.reqId + ", snp=" +
opCtx0.snpName + ", caches=" + stopCaches + ']');
+
+ // Skip deleting cache files as they will be removed during rollback.
+ return ctx.cache().dynamicDestroyCaches(stopCaches, false, false)
+ .chain(fut -> {
+ if (fut.error() != null)
+ throw F.wrap(fut.error());
+ else
+ return true;
+ });
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @param res Results.
+ * @param errs Errors.
+ */
+ private void finishCacheStop(UUID reqId, Map<UUID, Boolean> res, Map<UUID,
Exception> errs) {
+ if (ctx.clientNode())
+ return;
+
+ if (!errs.isEmpty()) {
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ log.error("Failed to stop caches during a snapshot rollback
routine " +
+ "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName + ",
err=" + errs + ']');
+ }
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ rollbackRestoreProc.start(reqId, reqId);
+ }
+
+ /**
+ * Inits restoring incremental snapshot.
+ *
+ * @param reqId Request ID.
+ * @return Result future.
+ */
+ private IgniteInternalFuture<Boolean> incrementalSnapshotRestore(UUID
reqId) {
+ if (ctx.clientNode())
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting incremental snapshot restore operation " +
+ "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName + ",
incIdx=" + opCtx0.incIdx +
+ ", caches=" + F.viewReadOnly(opCtx0.cfgs, c ->
c.config().getName()) + ']');
+ }
+
+ GridFutureAdapter<Boolean> res = new GridFutureAdapter<>();
+
+ ctx.pools().getSnapshotExecutorService().submit(() -> {
+ try {
+ Set<Integer> cacheIds = opCtx0.cfgs.keySet();
+
+ walEnabled(false, cacheIds);
+
+ restoreIncrementalSnapshot(opCtx0.snpName, opCtx0.snpPath,
cacheIds, opCtx0.incIdx);
+
+ walEnabled(true, cacheIds);
+
+ CheckpointProgress cp = ctx.cache().context().database()
+ .forceNewCheckpoint("Incremental snapshot restored.",
(fut) -> {
+ if (fut.error() != null)
+ res.onDone(fut.error());
+ else
+ res.onDone(true);
+ });
+
+ if (cp == null)
+ res.onDone(new IgniteCheckedException("Node is
stopping."));
+ }
+ catch (Throwable e) {
+ res.onDone(e);
+ }
+ });
+
+ return res;
+ }
+
+ /**
+ * Restore incremental snapshot.
+ *
+ * @param snpName Base snapshot name.
+ * @param snpPath Base snapshot path.
+ * @param cacheIds Restoring cache IDs.
+ * @param incIdx Index of incremental snapshot.
+ */
+ private void restoreIncrementalSnapshot(
+ String snpName,
+ String snpPath,
+ Set<Integer> cacheIds,
+ int incIdx
+ ) throws IgniteCheckedException, IOException {
+ File[] segments = walSegments(snpName, snpPath, incIdx);
+
+ UUID incSnpId = ctx.cache().context().snapshotMgr()
+ .readIncrementalSnapshotMetadata(snpName, snpPath, incIdx)
+ .requestId();
+
+ IncrementalSnapshotFinishRecord incSnpFinRec =
readFinishRecord(segments[segments.length - 1], incSnpId);
+
+ if (incSnpFinRec == null)
+ throw new IgniteCheckedException("System WAL records for
incremental snapshot wasn't found [id=" + incSnpId + ']');
+
+ CacheStripedExecutor exec = new
CacheStripedExecutor(ctx.pools().getStripedExecutorService());
+
+ UUID prevIncSnpId = incIdx > 1
+ ?
ctx.cache().context().snapshotMgr().readIncrementalSnapshotMetadata(snpName,
snpPath, incIdx - 1).requestId()
+ : null;
+
+ long start = U.currentTimeMillis();
+
+ LongAdder applied = new LongAdder();
+
+ try (WALIterator it = walIter(log, segments)) {
+ IgnitePredicate<GridCacheVersion> txVerFilter = null;
+
+ // Skips applying WAL until base snapshot record.
+ while (it.hasNext()) {
Review Comment:
Looks like we can search for a specific WAL record with the record type
filter - `IgniteWalIteratorFactory#IteratorParametersBuilder#addFilter`.
Filter allow do not read whole record to skip it.
So, basically iteration algorithm can be like following:
1. Create `walIterator1` with the filter only `CLUSTER_SNAPSHOT` record.
2. iterate it while find desired record -> snapRec
3. Create `walIterator2` from `WALPointer#next` to iterate further.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]