sk0x50 commented on a change in pull request #5779: IGNITE-10508 Support the 
new checkpoint feature not wait for the previous operation to complete
URL: https://github.com/apache/ignite/pull/5779#discussion_r253418723
 
 

 ##########
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 ##########
 @@ -3218,139 +3363,57 @@ public IgniteInternalFuture 
wakeupForSnapshotCreation(SnapshotOperation snapshot
          *
          */
         private void doCheckpoint() {
-            Checkpoint chp = null;
+            resetLastCheckpointTimeStamp();
 
-            try {
-                CheckpointMetricsTracker tracker = new 
CheckpointMetricsTracker();
+            Checkpoint chp = new Checkpoint(lastCpTs, memoryRecoveryRecordPtr);
 
-                try {
-                    chp = markCheckpointBegin(tracker);
-                }
-                catch (IgniteCheckedException e) {
-                    if (curCpProgress != null)
-                        curCpProgress.cpFinishFut.onDone(e);
+            memoryRecoveryRecordPtr = null;
 
-                    // In case of checkpoint initialization error node should 
be invalidated and stopped.
-                    cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
+            try {
+                markCheckpointBegin(chp);
 
-                    throw new IgniteException(e); // Re-throw as unchecked 
exception to force stopping checkpoint thread.
-                }
+                currCheckpointPagesCnt = chp.pages();
 
                 updateHeartbeat();
 
-                currCheckpointPagesCnt = chp.pagesSize;
-
                 writtenPagesCntr = new AtomicInteger();
                 syncedPagesCntr = new AtomicInteger();
                 evictedPagesCntr = new AtomicInteger();
 
                 boolean success = false;
 
-                int destroyedPartitionsCnt;
-
                 try {
-                    if (chp.hasDelta()) {
-                        // Identity stores set.
-                        ConcurrentLinkedHashMap<PageStore, LongAdder> 
updStores = new ConcurrentLinkedHashMap<>();
-
-                        CountDownFuture doneWriteFut = new CountDownFuture(
-                            asyncRunner == null ? 1 : 
chp.cpPages.collectionsSize());
-
-                        tracker.onPagesWriteStart();
-
-                        final int totalPagesToWriteCnt = chp.cpPages.size();
-
-                        if (asyncRunner != null) {
-                            for (int i = 0; i < chp.cpPages.collectionsSize(); 
i++) {
-                                Runnable write = new WriteCheckpointPages(
-                                    tracker,
-                                    chp.cpPages.innerCollection(i),
-                                    updStores,
-                                    doneWriteFut,
-                                    totalPagesToWriteCnt,
-                                    new Runnable() {
-                                        @Override public void run() {
-                                            updateHeartbeat();
-                                        }
-                                    },
-                                    asyncRunner
-                                );
-
-                                try {
-                                    asyncRunner.execute(write);
-                                }
-                                catch (RejectedExecutionException ignore) {
-                                    // Run the task synchronously.
-                                    updateHeartbeat();
-
-                                    write.run();
-                                }
-                            }
-                        }
-                        else {
-                            // Single-threaded checkpoint.
-                            updateHeartbeat();
-
-                            Runnable write = new WriteCheckpointPages(
-                                tracker,
-                                chp.cpPages,
-                                updStores,
-                                doneWriteFut,
-                                totalPagesToWriteCnt,
-                                new Runnable() {
-                                    @Override public void run() {
-                                        updateHeartbeat();
-                                    }
-                                },
-                                null);
-
-                            write.run();
-                        }
-
-                        updateHeartbeat();
-
-                        // Wait and check for errors.
-                        doneWriteFut.get();
-
-                        // Must re-check shutdown flag here because threads 
may have skipped some pages.
-                        // If so, we should not put finish checkpoint mark.
-                        if (shutdownNow) {
-                            chp.progress.cpFinishFut.onDone(new 
NodeStoppingException("Node is stopping."));
-
+                    // Write dirty pages and fsync stores.
+                    if (chp.hasPages()) {
+                        if (checkCancel.apply(chp))
                             return;
-                        }
 
-                        tracker.onFsyncStart();
+                        // Write pages to stores.
+                        writePages(chp);
 
-                        if (!skipSync) {
-                            for (Map.Entry<PageStore, LongAdder> updStoreEntry 
: updStores.entrySet()) {
-                                if (shutdownNow) {
-                                    chp.progress.cpFinishFut.onDone(new 
NodeStoppingException("Node is stopping."));
-
-                                    return;
-                                }
+                        if (checkCancel.apply(chp))
+                            return;
 
-                                blockingSectionBegin();
+                        // Fsync page stores.
+                        fsyncStores(chp);
 
-                                try {
-                                    updStoreEntry.getKey().sync();
-                                }
-                                finally {
-                                    blockingSectionEnd();
-                                }
-
-                                
syncedPagesCntr.addAndGet(updStoreEntry.getValue().intValue());
-                            }
-                        }
+                        if (checkCancel.apply(chp))
+                            return;
                     }
                     else {
-                        tracker.onPagesWriteStart();
-                        tracker.onFsyncStart();
+                        chp.metrics.onPagesWriteStart();
+
+                        chp.metrics.onFsyncStart();
                     }
 
+                    // Notify snapshot manager that all pages was written and 
fsync completed.
 
 Review comment:
   `Notify snapshot manager that all pages were written and fsync completed.`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to