alex-plekhanov commented on a change in pull request #6554: IGNITE-11073: 
Backup page store manager, initial
URL: https://github.com/apache/ignite/pull/6554#discussion_r380079876
 
 

 ##########
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
 ##########
 @@ -205,186 +265,93 @@ public UUID sourceNodeId() {
         return parts;
     }
 
-    /**
-     * @return Future which will be completed when snapshot operation ends.
-     */
-    public IgniteInternalFuture<Boolean> snapshotFuture() {
-        return snpFut;
-    }
-
-    /**
-     * @param state A new snapshot state to set.
-     * @return {@code true} if given state has been set by this call.
-     */
-    public boolean state(SnapshotState state) {
-        if (this.state == state)
-            return false;
-
-        synchronized (this) {
-            if (this.state == state)
-                return false;
-
-            if (state == SnapshotState.STOPPING) {
-                this.state = SnapshotState.STOPPING;
-
-                return true;
-            }
-
-            if (state.ordinal() > this.state.ordinal()) {
-                this.state = state;
-
-                if (state == SnapshotState.STARTED)
-                    startedFut.onDone();
-
-                return true;
-            }
-            else
-                return false;
-        }
-    }
-
     /**
      * @param th An exception which occurred during snapshot processing.
      */
     public void acceptException(Throwable th) {
-        assert th != null;
+        if (th == null)
+            return;
 
-        if (state(SnapshotState.STOPPING)) {
-            lastTh = th;
+        if (err.compareAndSet(null, th))
+            closeAsync();
 
-            startedFut.onDone(th);
-        }
+        startedFut.onDone(th);
 
         log.error("Exception occurred during snapshot operation", th);
     }
 
     /**
-     * @param th Occurred exception during processing or {@code null} if not.
+     * Close snapshot operation and release resources being used.
      */
-    public void close(Throwable th) {
-        if (state(SnapshotState.STOPPED)) {
-            if (lastTh == null)
-                lastTh = th;
+    private void close() {
+        if (isDone())
+            return;
 
-            Throwable lastTh0 = lastTh;
+        Throwable err0 = err.get();
 
+        if (onDone(true, err0, cancelled)) {
             for (PageStoreSerialWriter writer : partDeltaWriters.values())
                 U.closeQuiet(writer);
 
-            snpSndr.close(lastTh0);
+            snpSndr.close(err0);
 
             if (tmpSnpDir != null)
                 U.delete(tmpSnpDir);
 
             // Delete snapshot directory if no other files exists.
             try {
-                if (U.fileCount(tmpTaskWorkDir.toPath()) == 0 || lastTh0 != 
null)
+                if (U.fileCount(tmpTaskWorkDir.toPath()) == 0 || err0 != null)
                     U.delete(tmpTaskWorkDir.toPath());
             }
             catch (IOException e) {
                 log.error("Snapshot directory doesn't exist [snpName=" + 
snpName + ", dir=" + tmpTaskWorkDir + ']');
             }
 
-            if (lastTh0 != null)
-                startedFut.onDone(lastTh0);
-
-            snpFut.onDone(true, lastTh0, cancelled);
+            if (err0 != null)
+                startedFut.onDone(err0);
         }
     }
 
     /**
-     * @return Future which will be completed on snapshot start.
+     * @throws IgniteCheckedException If fails.
      */
-    public IgniteInternalFuture<Void> submit() {
-        try {
-            tmpSnpDir = 
U.resolveWorkDirectory(tmpTaskWorkDir.getAbsolutePath(),
-                
relativeNodePath(cctx.kernalContext().pdsFolderResolver().resolveFolders()),
-                false);
-
-            snpSndr.init();
-
-            Set<Integer> grps = parts.stream()
-                .map(GroupPartitionId::getGroupId)
-                .collect(Collectors.toSet());
-
-            Map<Integer, File> dirs = new HashMap<>();
-
-            for (Integer grpId : grps) {
-                CacheGroupContext gctx = cctx.cache().cacheGroup(grpId);
-
-                if (gctx == null)
-                    throw new IgniteCheckedException("Cache group context has 
not found. Cache group is stopped: " + grpId);
-
-                if (!CU.isPersistentCache(gctx.config(), 
cctx.kernalContext().config().getDataStorageConfiguration()))
-                    throw new IgniteCheckedException("In-memory cache groups 
are not allowed to be snapshotted: " + grpId);
-
-                if (gctx.config().isEncryptionEnabled())
-                    throw new IgniteCheckedException("Encrypted cache groups 
are note allowed to be snapshotted: " + grpId);
-
-                // Create cache snapshot directory if not.
-                File grpDir = 
U.resolveWorkDirectory(tmpSnpDir.getAbsolutePath(),
-                    cacheDirName(gctx.config()), false);
-
-                U.ensureDirectory(grpDir,
-                    "snapshot directory for cache group: " + gctx.groupId(),
-                    null);
-
-                dirs.put(grpId, grpDir);
-            }
-
-            CompletableFuture<Boolean> cpEndFut0 = cpEndFut;
+    public void awaitStarted() throws IgniteCheckedException {
+        startedFut.get();
+    }
 
-            for (GroupPartitionId pair : parts) {
-                PageStore store = 
((FilePageStoreManager)cctx.pageStore()).getStore(pair.getGroupId(),
-                    pair.getPartitionId());
+    /**
+     * Initiates snapshot taks.
 
 Review comment:
   taks -> task

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to