This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 6afec64  IGNITE-12128 Fix potential PDS corruption when a node fails 
during checkpoint - Fixes #6851.
6afec64 is described below

commit 6afec64c208a5a6e52e8a18576aa14a748ad745b
Author: Anton Kalashnikov <kaa....@yandex.ru>
AuthorDate: Mon Sep 9 15:48:49 2019 +0300

    IGNITE-12128 Fix potential PDS corruption when a node fails during 
checkpoint - Fixes #6851.
    
    Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>
---
 .../cache/persistence/CheckpointFuture.java        |   5 -
 .../GridCacheDatabaseSharedManager.java            | 112 +++++++++---
 .../cache/persistence/PageStoreWriter.java         |   3 +-
 .../cache/persistence/pagemem/CheckpointPages.java |  88 ++++++++++
 .../cache/persistence/pagemem/PageMemoryEx.java    |   5 +-
 .../cache/persistence/pagemem/PageMemoryImpl.java  |  45 ++---
 .../ignite/internal/util/StripedExecutor.java      |   3 +-
 .../IgnitePdsRecoveryAfterFileCorruptionTest.java  |   3 +-
 .../CheckpointFailBeforeWriteMarkTest.java         | 190 +++++++++++++++++++++
 ...CheckpointSimulationWithRealCpDisabledTest.java |   5 +-
 .../IgnitePageMemReplaceDelayedWriteUnitTest.java  |   5 +-
 .../persistence/pagemem/PageMemoryImplTest.java    |  13 +-
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |   3 +
 13 files changed, 420 insertions(+), 60 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
index 23287f1..381bb55 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
@@ -32,9 +32,4 @@ public interface CheckpointFuture {
      * @return Finish future.
      */
     public GridFutureAdapter<Object> finishFuture();
-
-    /**
-     * @return Checkpoint was already started.
-     */
-    public boolean started();
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 82bb555..a738f2d 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -162,6 +162,7 @@ import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.TimeBag;
 import org.apache.ignite.internal.util.future.CountDownFuture;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridInClosure3X;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -205,6 +206,10 @@ import static 
org.apache.ignite.internal.pagemem.PageIdUtils.partId;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
 import static 
org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.FINISHED;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.LOCK_RELEASED;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.LOCK_TAKEN;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.MARKER_STORED_TO_DISK;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_FILE_MATCHER;
 import static 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
 import static 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion;
@@ -1622,7 +1627,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                         throw new IgniteException(new 
NodeStoppingException("Failed to perform cache update: node is stopping."));
                     }
 
-                    if (checkpointLock.getReadHoldCount() > 1 || 
safeToUpdatePageMemories())
+                    if (checkpointLock.getReadHoldCount() > 1 || 
safeToUpdatePageMemories() || checkpointerThread == null)
                         break;
                     else {
                         checkpointLock.readLock().unlock();
@@ -3032,11 +3037,13 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
         int pagesNum = 0;
 
+        GridFinishedFuture finishedFuture = new GridFinishedFuture();
+
         // Collect collection of dirty pages from all regions.
         for (DataRegion memPlc : regions) {
             if (memPlc.config().isPersistenceEnabled()){
                 GridMultiCollectionWrapper<FullPageId> nextCpPagesCol =
-                    ((PageMemoryEx)memPlc.pageMemory()).beginCheckpoint();
+                    
((PageMemoryEx)memPlc.pageMemory()).beginCheckpoint(finishedFuture);
 
                 pagesNum += nextCpPagesCol.size();
 
@@ -4111,7 +4118,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                 fillCacheGroupState(cpRec);
 
-                cpPagesTuple = beginAllCheckpoints();
+                //There are allowable to replace pages only after checkpoint 
entry was stored to disk.
+                cpPagesTuple = beginAllCheckpoints(curr.cpMarkerStored);
 
                 hasPages = hasPageForWrite(cpPagesTuple.get1());
 
@@ -4175,6 +4183,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                 writeCheckpointEntry(tmpWriteBuf, cp, 
CheckpointEntryType.START);
 
+                curr.cpMarkerStored.onDone();
+
                 GridMultiCollectionWrapper<FullPageId> cpPages = 
splitAndSortCpPagesIfNeeded(
                     cpPagesTuple, persistenceCfg.getCheckpointThreads());
 
@@ -4351,7 +4361,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
             synchronized (this) {
                 curr = scheduledCp;
 
-                curr.started = true;
+                curr.state(LOCK_TAKEN);
 
                 if (curr.reason == null)
                     curr.reason = "timeout";
@@ -4416,10 +4426,13 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         }
 
         /**
-         * @return tuple with collections of FullPageIds obtained from each 
PageMemory and overall number of dirty
-         * pages.
+         * @return tuple with collections of FullPageIds obtained from each 
PageMemory, overall number of dirty
+         * pages, and flag defines at least one user page became a dirty since 
last checkpoint.
+         * @param allowToReplace The sign which allows to replace pages from a 
checkpoint by page replacer.
          */
-        private 
IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> 
beginAllCheckpoints() {
+        private 
IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> 
beginAllCheckpoints(
+            IgniteInternalFuture allowToReplace
+        ) {
             Collection<GridMultiCollectionWrapper<FullPageId>> res = new 
ArrayList(dataRegions().size());
 
             int pagesNum = 0;
@@ -4428,7 +4441,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 if (!memPlc.config().isPersistenceEnabled())
                     continue;
 
-                GridMultiCollectionWrapper<FullPageId> nextCpPagesCol = 
((PageMemoryEx)memPlc.pageMemory()).beginCheckpoint();
+                GridMultiCollectionWrapper<FullPageId> nextCpPagesCol = 
((PageMemoryEx)memPlc.pageMemory())
+                    .beginCheckpoint(allowToReplace);
 
                 pagesNum += nextCpPagesCol.size();
 
@@ -4994,15 +5008,35 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         /** Scheduled time of checkpoint. */
         private volatile long nextCpNanos;
 
-        /** Checkpoint begin phase future. */
-        private GridFutureAdapter cpBeginFut = new GridFutureAdapter<>();
+        /** Checkpoint begin phase future. TODO it should be encapsulated. */
+        private GridFutureAdapter cpBeginFut = new GridFutureAdapter<Void>() {
+            @Override public boolean onDone(@Nullable Void res, @Nullable 
Throwable err, boolean cancel) {
+                CheckpointProgress.this.state(LOCK_RELEASED);
+
+                return super.onDone(res, err, cancel);
+            }
+        };
+
+        /** Checkpoint marker stored to disk phase future. TODO it should be 
encapsulated. */
+        private GridFutureAdapter cpMarkerStored = new 
GridFutureAdapter<Void>() {
+            @Override public boolean onDone(@Nullable Void res, @Nullable 
Throwable err, boolean cancel) {
+                CheckpointProgress.this.state(MARKER_STORED_TO_DISK);
+
+                return super.onDone(res, err, cancel);
+            }
+        };
 
-        /** Checkpoint finish phase future. */
+        /** Checkpoint finish phase future. TODO it should be encapsulated. */
         private GridFutureAdapter cpFinishFut = new GridFutureAdapter<Void>() {
             @Override protected boolean onDone(@Nullable Void res, @Nullable 
Throwable err, boolean cancel) {
                 if (err != null && !cpBeginFut.isDone())
                     cpBeginFut.onDone(err);
 
+                if (err != null && !cpMarkerStored.isDone())
+                    cpMarkerStored.onDone(err);
+
+                CheckpointProgress.this.state(FINISHED);
+
                 return super.onDone(res, err, cancel);
             }
         };
@@ -5010,8 +5044,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         /** Flag indicates that snapshot operation will be performed after 
checkpoint. */
         private volatile boolean nextSnapshot;
 
-        /** Flag indicates that checkpoint is started. */
-        private volatile boolean started;
+        /** Current checkpoint state. */
+        private volatile AtomicReference<State> state = new 
AtomicReference(State.SCHEDULED);
 
         /** Snapshot operation that should be performed if {@link 
#nextSnapshot} set to true. */
         private volatile SnapshotOperation snapshotOperation;
@@ -5029,6 +5063,15 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
             this.nextCpNanos = System.nanoTime() + U.millisToNanos(cpFreq);
         }
 
+        /**
+         * @return {@code true} if checkpoint is stated.
+         * @deprecated For legacy reason.
+         */
+        @Deprecated
+        public boolean inProgress() {
+            return state.get().ordinal() >= State.LOCK_TAKEN.ordinal();
+        }
+
         /** */
         public boolean started() {
             return cpBeginFut.isDone();
@@ -5038,6 +5081,42 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         public boolean finished() {
             return cpFinishFut.isDone();
         }
+
+        /**
+         * @param expectedState Expected state.
+         * @return {@code true} if current state equal to given state.
+         */
+        public boolean atLeastState(State expectedState) {
+            return state.get().ordinal() >= expectedState.ordinal();
+        }
+
+        /**
+         * Changing checkpoint state if order of state is correct.
+         *
+         * @param newState New checkpoint state.
+         */
+        public void state(@NotNull State newState) {
+            State state = this.state.get();
+
+            if (state.ordinal() < newState.ordinal())
+                this.state.compareAndSet(state, newState);
+        }
+
+        /**
+         * Possible checkpoint states. Ordinal is important. Every next state 
follows the previous one.
+         */
+        enum State {
+            /** Checkpoint is waiting to execution. **/
+            SCHEDULED,
+            /** Checkpoint was awakened and it is preparing to start. **/
+            LOCK_TAKEN,
+            /** Checkpoint counted the pages and write lock was released. **/
+            LOCK_RELEASED,
+            /** Checkpoint marker was stored to disk. **/
+            MARKER_STORED_TO_DISK,
+            /** Checkpoint was finished. **/
+            FINISHED
+        }
     }
 
     /**
@@ -5055,7 +5134,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
         /** */
         CheckpointProgressSnapshot(CheckpointProgress cpProgress) {
-            started = cpProgress.started;
+            started = cpProgress.inProgress();
             cpBeginFut = cpProgress.cpBeginFut;
             cpFinishFut = cpProgress.cpFinishFut;
         }
@@ -5069,11 +5148,6 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         @Override public GridFutureAdapter<Object> finishFuture() {
             return cpFinishFut;
         }
-
-        /** {@inheritDoc} */
-        @Override public boolean started() {
-            return started;
-        }
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java
index 18f6d04..0771010 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.cache.persistence;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.store.PageStore;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
@@ -31,7 +32,7 @@ public interface PageStoreWriter {
      * Callback for write page. {@link PageMemoryEx} will copy page content to 
buffer before call.
      *
      * @param fullPageId Page ID to get byte buffer for. The page ID must be 
present in the collection returned by
-     *      the {@link PageMemoryEx#beginCheckpoint()} method call.
+     *      the {@link PageMemoryEx#beginCheckpoint(IgniteInternalFuture)} 
method call.
      * @param buf Temporary buffer to write changes into.
      * @param tag  {@code Partition generation} if data was read, {@code null} 
otherwise (data already saved to storage).
      * @throws IgniteCheckedException If write page failed.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointPages.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointPages.java
new file mode 100644
index 0000000..46540f9
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointPages.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.pagemem;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.pagemem.FullPageId;
+
+/**
+ * View of pages which should be stored during current checkpoint.
+ */
+class CheckpointPages {
+    /** */
+    private final Collection<FullPageId> segCheckpointPages;
+
+    /** The sign which allows to replace pages from a checkpoint by page 
replacer. */
+    private final IgniteInternalFuture allowToReplace;
+
+    /**
+     * @param pages Pages which would be stored to disk in current checkpoint.
+     * @param replaceFuture The sign which allows to replace pages from a 
checkpoint by page replacer.
+     */
+    CheckpointPages(Collection<FullPageId> pages, IgniteInternalFuture 
replaceFuture) {
+        segCheckpointPages = pages;
+        allowToReplace = replaceFuture;
+    }
+
+    /**
+     * @param fullPageId Page id for checking.
+     * @return {@code true} If fullPageId is allowable to store to disk.
+     */
+    public boolean allowToSave(FullPageId fullPageId) throws 
IgniteCheckedException {
+        Collection<FullPageId> checkpointPages = segCheckpointPages;
+
+        if (checkpointPages == null || allowToReplace == null)
+            return false;
+
+        //Uninterruptibly is important because otherwise in case of interrupt 
of client thread node would be stopped.
+        allowToReplace.getUninterruptibly();
+
+        return checkpointPages.contains(fullPageId);
+    }
+
+    /**
+     * @param fullPageId Page id for checking.
+     * @return {@code true} If fullPageId is candidate to stored to disk by 
current checkpoint.
+     */
+    public boolean contains(FullPageId fullPageId) {
+        Collection<FullPageId> checkpointPages = segCheckpointPages;
+
+        return checkpointPages != null && checkpointPages.contains(fullPageId);
+    }
+
+    /**
+     * @param fullPageId Page id which should be marked as saved to disk.
+     * @return {@code true} if is marking was successful.
+     */
+    public boolean markAsSaved(FullPageId fullPageId) {
+        Collection<FullPageId> checkpointPages = segCheckpointPages;
+
+        return checkpointPages != null && checkpointPages.remove(fullPageId);
+    }
+
+    /**
+     * @return Size of all pages in current checkpoint.
+     */
+    public int size() {
+        Collection<FullPageId> checkpointPages = segCheckpointPages;
+
+        return checkpointPages == null ? 0 : checkpointPages.size();
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
index 2c17431..f9fdb0d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
@@ -114,8 +114,9 @@ public interface PageMemoryEx extends PageMemory {
      *
      * @return Collection of dirty page IDs.
      * @throws IgniteException If checkpoint has been already started and was 
not finished.
+     * @param allowToReplace The sign which allows to replace pages from a 
checkpoint by page replacer.
      */
-    public GridMultiCollectionWrapper<FullPageId> beginCheckpoint() throws 
IgniteException;
+    public GridMultiCollectionWrapper<FullPageId> 
beginCheckpoint(IgniteInternalFuture allowToReplace) throws IgniteException;
 
     /**
      * Finishes checkpoint operation.
@@ -127,7 +128,7 @@ public interface PageMemoryEx extends PageMemory {
      *{@link PageStoreWriter} will be called when the page will be ready to 
write.
      *
      * @param pageId Page ID to get byte buffer for. The page ID must be 
present in the collection returned by
-     *      the {@link #beginCheckpoint()} method call.
+     *      the {@link #beginCheckpoint(IgniteInternalFuture)} method call.
      * @param buf Temporary buffer to write changes into.
      * @param pageWriter Checkpoint page write context.
      * @param tracker Checkpoint metrics tracker.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index e2bb433..fcc6efe 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -634,11 +634,12 @@ public class PageMemoryImpl implements PageMemoryEx {
         }
         finally {
             seg.writeLock().unlock();
-
-            if (delayedWriter != null)
-                delayedWriter.finishReplacement();
         }
 
+        //Finish replacement only when an exception wasn't thrown otherwise it 
possible to corrupt B+Tree.
+        if (delayedWriter != null)
+            delayedWriter.finishReplacement();
+
         //we have allocated 'tracking' page, we need to allocate regular one
         return isTrackingPage ? allocatePage(grpId, partId, flags) : pageId;
     }
@@ -933,10 +934,10 @@ public class PageMemoryImpl implements PageMemoryEx {
         if (rmv)
             seg.loadedPages.remove(grpId, PageIdUtils.effectivePageId(pageId));
 
-        Collection<FullPageId> cpPages = seg.segCheckpointPages;
+        CheckpointPages cpPages = seg.checkpointPages;
 
         if (cpPages != null)
-            cpPages.remove(new FullPageId(pageId, grpId));
+            cpPages.markAsSaved(new FullPageId(pageId, grpId));
 
         Collection<FullPageId> dirtyPages = seg.dirtyPages;
 
@@ -1125,7 +1126,9 @@ public class PageMemoryImpl implements PageMemoryEx {
     }
 
     /** {@inheritDoc} */
-    @Override public GridMultiCollectionWrapper<FullPageId> beginCheckpoint() 
throws IgniteException {
+    @Override public GridMultiCollectionWrapper<FullPageId> beginCheckpoint(
+        IgniteInternalFuture allowToReplace
+    ) throws IgniteException {
         if (segments == null)
             return new 
GridMultiCollectionWrapper<>(Collections.<FullPageId>emptyList());
 
@@ -1134,10 +1137,13 @@ public class PageMemoryImpl implements PageMemoryEx {
         for (int i = 0; i < segments.length; i++) {
             Segment seg = segments[i];
 
-            if (seg.segCheckpointPages != null)
+            if (seg.checkpointPages != null)
                 throw new IgniteException("Failed to begin checkpoint (it is 
already in progress).");
 
-            collections[i] = seg.segCheckpointPages = seg.dirtyPages;
+            Collection<FullPageId> dirtyPages = seg.dirtyPages;
+            collections[i] = dirtyPages;
+
+            seg.checkpointPages = new CheckpointPages(dirtyPages, 
allowToReplace);
 
             seg.dirtyPages = new GridConcurrentHashSet<>();
         }
@@ -1163,7 +1169,7 @@ public class PageMemoryImpl implements PageMemoryEx {
             return;
 
         for (Segment seg : segments)
-            seg.segCheckpointPages = null;
+            seg.checkpointPages = null;
 
         if (throttlingPlc != ThrottlingPolicy.DISABLED)
             writeThrottle.onFinishCheckpoint();
@@ -1203,7 +1209,7 @@ public class PageMemoryImpl implements PageMemoryEx {
             if (relPtr != OUTDATED_REL_PTR) {
                 absPtr = seg.absolute(relPtr);
 
-                // Pin the page until page will not be copied.
+                // Pin the page until page will not be copied. This helpful to 
prevent page replacement of this page.
                 if (PageHeader.tempBufferPointer(absPtr) == INVALID_REL_PTR)
                     PageHeader.acquirePage(absPtr);
                 else
@@ -1754,7 +1760,7 @@ public class PageMemoryImpl implements PageMemoryEx {
     boolean isInCheckpoint(FullPageId pageId) {
         Segment seg = segment(pageId.groupId(), pageId.pageId());
 
-        Collection<FullPageId> pages0 = seg.segCheckpointPages;
+        CheckpointPages pages0 = seg.checkpointPages;
 
         return pages0 != null && pages0.contains(pageId);
     }
@@ -1766,11 +1772,11 @@ public class PageMemoryImpl implements PageMemoryEx {
     boolean clearCheckpoint(FullPageId fullPageId) {
         Segment seg = segment(fullPageId.groupId(), fullPageId.pageId());
 
-        Collection<FullPageId> pages0 = seg.segCheckpointPages;
+        CheckpointPages pages0 = seg.checkpointPages;
 
         assert pages0 != null;
 
-        return pages0.remove(fullPageId);
+        return pages0.markAsSaved(fullPageId);
     }
 
     /**
@@ -2106,8 +2112,8 @@ public class PageMemoryImpl implements PageMemoryEx {
         /** Pages marked as dirty since the last checkpoint. */
         private volatile Collection<FullPageId> dirtyPages = new 
GridConcurrentHashSet<>();
 
-        /** */
-        private volatile Collection<FullPageId> segCheckpointPages;
+        /** Wrapper of pages of current checkpoint. */
+        private volatile CheckpointPages checkpointPages;
 
         /** */
         private final int maxDirtyPages;
@@ -2256,14 +2262,13 @@ public class PageMemoryImpl implements PageMemoryEx {
             if (PageHeader.isAcquired(absPtr))
                 return false;
 
-            Collection<FullPageId> cpPages = segCheckpointPages;
-
             clearRowCache(fullPageId, absPtr);
 
             if (isDirty(absPtr)) {
+                CheckpointPages checkpointPages = this.checkpointPages;
                 // Can evict a dirty page only if should be written by a 
checkpoint.
                 // These pages does not have tmp buffer.
-                if (cpPages != null && cpPages.contains(fullPageId)) {
+                if (checkpointPages != null && 
checkpointPages.allowToSave(fullPageId)) {
                     assert storeMgr != null;
 
                     memMetrics.updatePageReplaceRate(U.currentTimeMillis() - 
PageHeader.readTimestamp(absPtr));
@@ -2279,7 +2284,7 @@ public class PageMemoryImpl implements PageMemoryEx {
 
                     setDirty(fullPageId, absPtr, false, true);
 
-                    cpPages.remove(fullPageId);
+                    checkpointPages.markAsSaved(fullPageId);
 
                     return true;
                 }
@@ -2566,7 +2571,7 @@ public class PageMemoryImpl implements PageMemoryEx {
                 ", loaded=" + loadedPages.size() +
                 ", maxDirtyPages=" + maxDirtyPages +
                 ", dirtyPages=" + dirtyPages.size() +
-                ", cpPages=" + (segCheckpointPages == null ? 0 : 
segCheckpointPages.size()) +
+                ", cpPages=" + (checkpointPages == null ? 0 : 
checkpointPages.size()) +
                 ", pinnedInSegment=" + pinnedCnt +
                 ", failedToPrepare=" + failToPrepare +
                 ']' + U.nl() + "Out of memory in data region [" +
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 923cef4..0a5ec6a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -563,8 +563,7 @@ public class StripedExecutor implements ExecutorService {
                     break;
                 }
                 catch (Throwable e) {
-                    if (e instanceof OutOfMemoryError)
-                        errHnd.apply(e);
+                    errHnd.apply(e);
 
                     U.error(log, "Failed to execute runnable.", e);
                 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
index 0cd040e..cc05658 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
@@ -49,6 +49,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.MvccFeatureChecker;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -316,7 +317,7 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest 
extends GridCommonAbstract
             }
         }
 
-        Collection<FullPageId> pageIds = mem.beginCheckpoint();
+        Collection<FullPageId> pageIds = mem.beginCheckpoint(new 
GridFinishedFuture());
 
         info("Acquired pages for checkpoint: " + pageIds.size());
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFailBeforeWriteMarkTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFailBeforeWriteMarkTest.java
new file mode 100644
index 0000000..2daac36
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFailBeforeWriteMarkTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.checkpoint;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.OpenOption;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ *
+ */
+public class CheckpointFailBeforeWriteMarkTest extends GridCommonAbstractTest {
+    /** */
+    private InterceptorIOFactory interceptorIOFactory = new 
InterceptorIOFactory();
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setConsistentId(igniteInstanceName);
+
+        DataStorageConfiguration storageCfg = new DataStorageConfiguration();
+
+        storageCfg.setCheckpointThreads(2)
+            .setFileIOFactory(interceptorIOFactory)
+            .setWalSegmentSize(5 * 1024 * 1024)
+            .setWalSegments(3);
+
+        storageCfg.getDefaultDataRegionConfiguration()
+            .setPersistenceEnabled(true)
+            .setMaxSize(10L * 1024 * 1024);
+
+        cfg.setDataStorageConfiguration(storageCfg)
+            .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setAffinity(new RendezvousAffinityFunction(false, 16)));
+
+        cfg.setFailureHandler(new StopNodeFailureHandler());
+
+        return cfg;
+    }
+
+    /**
+     * Test IO factory which given opportunity to throw IO exception by custom 
predicate.
+     */
+    private static class InterceptorIOFactory extends AsyncFileIOFactory {
+        /** */
+        private static final Predicate<File> DUMMY_PREDICATE = (f) -> false;
+
+        /** Time to wait before exception would be thrown. It is giving time 
to page replacer to work. */
+        private static final long DELAY_TIME = 1000;
+
+        /** Predicate which is a trigger of throwing an exception. */
+        transient volatile Predicate<File> failPredicate = DUMMY_PREDICATE;
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws 
IOException {
+            if(file.getName().contains("START.bin"))
+                sleep();
+
+            if (failPredicate.test(file)) {
+                failPredicate = DUMMY_PREDICATE;
+
+                throw new IOException("Triggered test exception");
+            }
+
+            return super.create(file, modes);
+        }
+
+        /** **/
+        private void sleep() {
+            try {
+                Thread.sleep(DELAY_TIME);
+            }
+            catch (InterruptedException ignore) {
+            }
+        }
+
+        /**
+         * Triggering exception by custom predicate.
+         *
+         * @param failPredicate Predicate for exception.
+         */
+        public void triggerIOException(Predicate<File> failPredicate) {
+            this.failPredicate = failPredicate;
+        }
+    }
+
+    /**
+     * @throws Exception if fail.
+     */
+    @Test
+    public void testCheckpointFailBeforeMarkEntityWrite() throws Exception {
+        //given: one node with persistence.
+        IgniteEx ignite0 = startGrid(0);
+
+        ignite0.cluster().active(true);
+
+        //It is necessary to understanding when page replacement would be 
started.
+        PageMemory pageMemory = 
ignite0.context().cache().context().database().dataRegion("default").pageMemory();
+
+        //when: Load a lot of data to cluster.
+        AtomicInteger lastKey = new AtomicInteger();
+        GridTestUtils.runMultiThreadedAsync(() -> {
+            IgniteCache<Integer, Object> cache2 = 
ignite(0).cache(DEFAULT_CACHE_NAME);
+
+            //Should stopped putting data when node is fail.
+            for (int i = 0; i < Integer.MAX_VALUE; i++) {
+                cache2.put(i, i);
+
+                lastKey.set(i);
+
+                if (i % 1000 == 0)
+                    log.info("WRITE : " + i);
+            }
+        }, 3, "LOAD-DATA");
+
+        //and: Page replacement was started.
+        assertTrue(waitForCondition(() -> U.field(pageMemory, 
"pageReplacementWarned"), 20_000));
+
+        //and: Node was failed during checkpoint after write lock was released 
and before checkpoint marker was stored to disk.
+        interceptorIOFactory.triggerIOException((file) -> 
file.getName().contains("START.bin"));
+
+        log.info("KILL NODE await to stop");
+
+        assertTrue(waitForCondition(() -> G.allGrids().isEmpty(), 20_000));
+
+        //then: Data recovery after node start should be successful.
+        ignite0 = startGrid(0);
+
+        ignite0.cluster().active(true);
+
+        IgniteCache<Integer, Object> cache = 
ignite(0).cache(DEFAULT_CACHE_NAME);
+
+        //WAL mode is 'default' so it is allowable to lost some last data(ex. 
last 100).
+        for(int i = 0; i < lastKey.get() - 100; i++)
+            assertNotNull(cache.get(i));
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
index 30de694..8d98026 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
@@ -77,6 +77,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemor
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridFilteredClosableIterator;
 import org.apache.ignite.internal.util.typedef.F;
@@ -614,7 +615,7 @@ public class 
IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
             ig.context().cache().context().database().checkpointReadUnlock();
         }
 
-        Collection<FullPageId> cpPages = mem.beginCheckpoint();
+        Collection<FullPageId> cpPages = mem.beginCheckpoint(new 
GridFinishedFuture());
 
         ig.context().cache().context().database().checkpointReadLock();
 
@@ -944,7 +945,7 @@ public class 
IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom
             try {
                 snapshot = new HashMap<>(resMap);
 
-                pageIds = mem.beginCheckpoint();
+                pageIds = mem.beginCheckpoint(new GridFinishedFuture());
 
                 checkpoints--;
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
index c019ecc..703d359 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
 import org.apache.ignite.internal.processors.metric.GridMetricManager;
 import 
org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
@@ -117,7 +118,7 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest {
             memory.releasePage(1, pageId, ptr);
         }
 
-        GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint();
+        GridMultiCollectionWrapper<FullPageId> ids = 
memory.beginCheckpoint(new GridFinishedFuture());
         int cpPages = ids.size();
         log.info("Started CP with [" + cpPages + "] pages in it, created [" + 
markDirty + "] pages");
 
@@ -180,7 +181,7 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest {
             memory.releasePage(1, pageId, ptr);
         }
 
-        GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint();
+        GridMultiCollectionWrapper<FullPageId> ids = 
memory.beginCheckpoint(new GridFinishedFuture());
         int cpPages = ids.size();
         log.info("Started CP with [" + cpPages + "] pages in it, created [" + 
markDirty + "] pages");
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index 9f38348..7c0a85d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -52,6 +52,7 @@ import 
org.apache.ignite.internal.processors.metric.GridMetricManager;
 import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
 import 
org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridInClosure3X;
 import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
@@ -123,7 +124,7 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
             //Success
         }
 
-        memory.beginCheckpoint();
+        memory.beginCheckpoint(new GridFinishedFuture());
 
         final AtomicReference<FullPageId> lastPage = new AtomicReference<>();
 
@@ -208,14 +209,14 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
             writePage(memory, fullId, (byte)1);
         }
 
-        doCheckpoint(memory.beginCheckpoint(), memory, pageStoreMgr);
+        doCheckpoint(memory.beginCheckpoint(new GridFinishedFuture()), memory, 
pageStoreMgr);
 
         FullPageId cowPageId = allocated.get(0);
 
         // Mark some pages as dirty.
         writePage(memory, cowPageId, (byte)2);
 
-        GridMultiCollectionWrapper<FullPageId> cpPages = 
memory.beginCheckpoint();
+        GridMultiCollectionWrapper<FullPageId> cpPages = 
memory.beginCheckpoint(new GridFinishedFuture());
 
         assertEquals(1, cpPages.size());
 
@@ -286,7 +287,7 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
         }
 
         // CP Write lock.
-        memory.beginCheckpoint();
+        memory.beginCheckpoint(new GridFinishedFuture());
         // CP Write unlock.
 
         byte[] buf = new byte[PAGE_SIZE];
@@ -361,7 +362,7 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
             acquireAndReleaseWriteLock(memory, fullPageId);
         }
 
-        memory.beginCheckpoint();
+        memory.beginCheckpoint(new GridFinishedFuture());
 
         CheckpointMetricsTracker mockTracker = 
Mockito.mock(CheckpointMetricsTracker.class);
 
@@ -383,7 +384,7 @@ public class PageMemoryImplTest extends 
GridCommonAbstractTest {
             acquireAndReleaseWriteLock(memory, fullPageId);
         }
 
-        memory.beginCheckpoint();
+        memory.beginCheckpoint(new GridFinishedFuture());
 
         Collections.shuffle(pages); // Mix pages in checkpoint with clean pages
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index d6ac566..d263dc0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReser
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgniteShutdownOnSupplyMessageFailureTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointFailBeforeWriteMarkTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointFreeListTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.IgniteCheckpointDirtyPagesForLowLoadTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest;
@@ -212,6 +213,8 @@ public class IgnitePdsTestSuite2 {
 
         GridTestUtils.addTestIfNeeded(suite, CheckpointFreeListTest.class, 
ignoredTests);
 
+        GridTestUtils.addTestIfNeeded(suite, 
CheckpointFailBeforeWriteMarkTest.class, ignoredTests);
+
         GridTestUtils.addTestIfNeeded(suite, 
IgniteWalIteratorSwitchSegmentTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, 
IgniteWalIteratorExceptionDuringReadTest.class, ignoredTests);

Reply via email to