This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 6afec64 IGNITE-12128 Fix potential PDS corruption when a node fails during checkpoint - Fixes #6851. 6afec64 is described below commit 6afec64c208a5a6e52e8a18576aa14a748ad745b Author: Anton Kalashnikov <kaa....@yandex.ru> AuthorDate: Mon Sep 9 15:48:49 2019 +0300 IGNITE-12128 Fix potential PDS corruption when a node fails during checkpoint - Fixes #6851. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> --- .../cache/persistence/CheckpointFuture.java | 5 - .../GridCacheDatabaseSharedManager.java | 112 +++++++++--- .../cache/persistence/PageStoreWriter.java | 3 +- .../cache/persistence/pagemem/CheckpointPages.java | 88 ++++++++++ .../cache/persistence/pagemem/PageMemoryEx.java | 5 +- .../cache/persistence/pagemem/PageMemoryImpl.java | 45 ++--- .../ignite/internal/util/StripedExecutor.java | 3 +- .../IgnitePdsRecoveryAfterFileCorruptionTest.java | 3 +- .../CheckpointFailBeforeWriteMarkTest.java | 190 +++++++++++++++++++++ ...CheckpointSimulationWithRealCpDisabledTest.java | 5 +- .../IgnitePageMemReplaceDelayedWriteUnitTest.java | 5 +- .../persistence/pagemem/PageMemoryImplTest.java | 13 +- .../ignite/testsuites/IgnitePdsTestSuite2.java | 3 + 13 files changed, 420 insertions(+), 60 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java index 23287f1..381bb55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java @@ -32,9 +32,4 @@ public interface CheckpointFuture { * @return Finish future. */ public GridFutureAdapter<Object> finishFuture(); - - /** - * @return Checkpoint was already started. - */ - public boolean started(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 82bb555..a738f2d 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -162,6 +162,7 @@ import org.apache.ignite.internal.util.StripedExecutor; import org.apache.ignite.internal.util.TimeBag; import org.apache.ignite.internal.util.future.CountDownFuture; import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridInClosure3X; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -205,6 +206,10 @@ import static org.apache.ignite.internal.pagemem.PageIdUtils.partId; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal; +import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.FINISHED; +import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.LOCK_RELEASED; +import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.LOCK_TAKEN; +import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.CheckpointProgress.State.MARKER_STORED_TO_DISK; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_FILE_MATCHER; import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType; import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion; @@ -1622,7 +1627,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan throw new IgniteException(new NodeStoppingException("Failed to perform cache update: node is stopping.")); } - if (checkpointLock.getReadHoldCount() > 1 || safeToUpdatePageMemories()) + if (checkpointLock.getReadHoldCount() > 1 || safeToUpdatePageMemories() || checkpointerThread == null) break; else { checkpointLock.readLock().unlock(); @@ -3032,11 +3037,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan int pagesNum = 0; + GridFinishedFuture finishedFuture = new GridFinishedFuture(); + // Collect collection of dirty pages from all regions. for (DataRegion memPlc : regions) { if (memPlc.config().isPersistenceEnabled()){ GridMultiCollectionWrapper<FullPageId> nextCpPagesCol = - ((PageMemoryEx)memPlc.pageMemory()).beginCheckpoint(); + ((PageMemoryEx)memPlc.pageMemory()).beginCheckpoint(finishedFuture); pagesNum += nextCpPagesCol.size(); @@ -4111,7 +4118,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan fillCacheGroupState(cpRec); - cpPagesTuple = beginAllCheckpoints(); + //There are allowable to replace pages only after checkpoint entry was stored to disk. + cpPagesTuple = beginAllCheckpoints(curr.cpMarkerStored); hasPages = hasPageForWrite(cpPagesTuple.get1()); @@ -4175,6 +4183,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan writeCheckpointEntry(tmpWriteBuf, cp, CheckpointEntryType.START); + curr.cpMarkerStored.onDone(); + GridMultiCollectionWrapper<FullPageId> cpPages = splitAndSortCpPagesIfNeeded( cpPagesTuple, persistenceCfg.getCheckpointThreads()); @@ -4351,7 +4361,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan synchronized (this) { curr = scheduledCp; - curr.started = true; + curr.state(LOCK_TAKEN); if (curr.reason == null) curr.reason = "timeout"; @@ -4416,10 +4426,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * @return tuple with collections of FullPageIds obtained from each PageMemory and overall number of dirty - * pages. + * @return tuple with collections of FullPageIds obtained from each PageMemory, overall number of dirty + * pages, and flag defines at least one user page became a dirty since last checkpoint. + * @param allowToReplace The sign which allows to replace pages from a checkpoint by page replacer. */ - private IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> beginAllCheckpoints() { + private IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> beginAllCheckpoints( + IgniteInternalFuture allowToReplace + ) { Collection<GridMultiCollectionWrapper<FullPageId>> res = new ArrayList(dataRegions().size()); int pagesNum = 0; @@ -4428,7 +4441,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (!memPlc.config().isPersistenceEnabled()) continue; - GridMultiCollectionWrapper<FullPageId> nextCpPagesCol = ((PageMemoryEx)memPlc.pageMemory()).beginCheckpoint(); + GridMultiCollectionWrapper<FullPageId> nextCpPagesCol = ((PageMemoryEx)memPlc.pageMemory()) + .beginCheckpoint(allowToReplace); pagesNum += nextCpPagesCol.size(); @@ -4994,15 +5008,35 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Scheduled time of checkpoint. */ private volatile long nextCpNanos; - /** Checkpoint begin phase future. */ - private GridFutureAdapter cpBeginFut = new GridFutureAdapter<>(); + /** Checkpoint begin phase future. TODO it should be encapsulated. */ + private GridFutureAdapter cpBeginFut = new GridFutureAdapter<Void>() { + @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) { + CheckpointProgress.this.state(LOCK_RELEASED); + + return super.onDone(res, err, cancel); + } + }; + + /** Checkpoint marker stored to disk phase future. TODO it should be encapsulated. */ + private GridFutureAdapter cpMarkerStored = new GridFutureAdapter<Void>() { + @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) { + CheckpointProgress.this.state(MARKER_STORED_TO_DISK); + + return super.onDone(res, err, cancel); + } + }; - /** Checkpoint finish phase future. */ + /** Checkpoint finish phase future. TODO it should be encapsulated. */ private GridFutureAdapter cpFinishFut = new GridFutureAdapter<Void>() { @Override protected boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) { if (err != null && !cpBeginFut.isDone()) cpBeginFut.onDone(err); + if (err != null && !cpMarkerStored.isDone()) + cpMarkerStored.onDone(err); + + CheckpointProgress.this.state(FINISHED); + return super.onDone(res, err, cancel); } }; @@ -5010,8 +5044,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Flag indicates that snapshot operation will be performed after checkpoint. */ private volatile boolean nextSnapshot; - /** Flag indicates that checkpoint is started. */ - private volatile boolean started; + /** Current checkpoint state. */ + private volatile AtomicReference<State> state = new AtomicReference(State.SCHEDULED); /** Snapshot operation that should be performed if {@link #nextSnapshot} set to true. */ private volatile SnapshotOperation snapshotOperation; @@ -5029,6 +5063,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan this.nextCpNanos = System.nanoTime() + U.millisToNanos(cpFreq); } + /** + * @return {@code true} if checkpoint is stated. + * @deprecated For legacy reason. + */ + @Deprecated + public boolean inProgress() { + return state.get().ordinal() >= State.LOCK_TAKEN.ordinal(); + } + /** */ public boolean started() { return cpBeginFut.isDone(); @@ -5038,6 +5081,42 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan public boolean finished() { return cpFinishFut.isDone(); } + + /** + * @param expectedState Expected state. + * @return {@code true} if current state equal to given state. + */ + public boolean atLeastState(State expectedState) { + return state.get().ordinal() >= expectedState.ordinal(); + } + + /** + * Changing checkpoint state if order of state is correct. + * + * @param newState New checkpoint state. + */ + public void state(@NotNull State newState) { + State state = this.state.get(); + + if (state.ordinal() < newState.ordinal()) + this.state.compareAndSet(state, newState); + } + + /** + * Possible checkpoint states. Ordinal is important. Every next state follows the previous one. + */ + enum State { + /** Checkpoint is waiting to execution. **/ + SCHEDULED, + /** Checkpoint was awakened and it is preparing to start. **/ + LOCK_TAKEN, + /** Checkpoint counted the pages and write lock was released. **/ + LOCK_RELEASED, + /** Checkpoint marker was stored to disk. **/ + MARKER_STORED_TO_DISK, + /** Checkpoint was finished. **/ + FINISHED + } } /** @@ -5055,7 +5134,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** */ CheckpointProgressSnapshot(CheckpointProgress cpProgress) { - started = cpProgress.started; + started = cpProgress.inProgress(); cpBeginFut = cpProgress.cpBeginFut; cpFinishFut = cpProgress.cpFinishFut; } @@ -5069,11 +5148,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan @Override public GridFutureAdapter<Object> finishFuture() { return cpFinishFut; } - - /** {@inheritDoc} */ - @Override public boolean started() { - return started; - } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java index 18f6d04..0771010 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence; import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; @@ -31,7 +32,7 @@ public interface PageStoreWriter { * Callback for write page. {@link PageMemoryEx} will copy page content to buffer before call. * * @param fullPageId Page ID to get byte buffer for. The page ID must be present in the collection returned by - * the {@link PageMemoryEx#beginCheckpoint()} method call. + * the {@link PageMemoryEx#beginCheckpoint(IgniteInternalFuture)} method call. * @param buf Temporary buffer to write changes into. * @param tag {@code Partition generation} if data was read, {@code null} otherwise (data already saved to storage). * @throws IgniteCheckedException If write page failed. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointPages.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointPages.java new file mode 100644 index 0000000..46540f9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/CheckpointPages.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.Collection; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.pagemem.FullPageId; + +/** + * View of pages which should be stored during current checkpoint. + */ +class CheckpointPages { + /** */ + private final Collection<FullPageId> segCheckpointPages; + + /** The sign which allows to replace pages from a checkpoint by page replacer. */ + private final IgniteInternalFuture allowToReplace; + + /** + * @param pages Pages which would be stored to disk in current checkpoint. + * @param replaceFuture The sign which allows to replace pages from a checkpoint by page replacer. + */ + CheckpointPages(Collection<FullPageId> pages, IgniteInternalFuture replaceFuture) { + segCheckpointPages = pages; + allowToReplace = replaceFuture; + } + + /** + * @param fullPageId Page id for checking. + * @return {@code true} If fullPageId is allowable to store to disk. + */ + public boolean allowToSave(FullPageId fullPageId) throws IgniteCheckedException { + Collection<FullPageId> checkpointPages = segCheckpointPages; + + if (checkpointPages == null || allowToReplace == null) + return false; + + //Uninterruptibly is important because otherwise in case of interrupt of client thread node would be stopped. + allowToReplace.getUninterruptibly(); + + return checkpointPages.contains(fullPageId); + } + + /** + * @param fullPageId Page id for checking. + * @return {@code true} If fullPageId is candidate to stored to disk by current checkpoint. + */ + public boolean contains(FullPageId fullPageId) { + Collection<FullPageId> checkpointPages = segCheckpointPages; + + return checkpointPages != null && checkpointPages.contains(fullPageId); + } + + /** + * @param fullPageId Page id which should be marked as saved to disk. + * @return {@code true} if is marking was successful. + */ + public boolean markAsSaved(FullPageId fullPageId) { + Collection<FullPageId> checkpointPages = segCheckpointPages; + + return checkpointPages != null && checkpointPages.remove(fullPageId); + } + + /** + * @return Size of all pages in current checkpoint. + */ + public int size() { + Collection<FullPageId> checkpointPages = segCheckpointPages; + + return checkpointPages == null ? 0 : checkpointPages.size(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java index 2c17431..f9fdb0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java @@ -114,8 +114,9 @@ public interface PageMemoryEx extends PageMemory { * * @return Collection of dirty page IDs. * @throws IgniteException If checkpoint has been already started and was not finished. + * @param allowToReplace The sign which allows to replace pages from a checkpoint by page replacer. */ - public GridMultiCollectionWrapper<FullPageId> beginCheckpoint() throws IgniteException; + public GridMultiCollectionWrapper<FullPageId> beginCheckpoint(IgniteInternalFuture allowToReplace) throws IgniteException; /** * Finishes checkpoint operation. @@ -127,7 +128,7 @@ public interface PageMemoryEx extends PageMemory { *{@link PageStoreWriter} will be called when the page will be ready to write. * * @param pageId Page ID to get byte buffer for. The page ID must be present in the collection returned by - * the {@link #beginCheckpoint()} method call. + * the {@link #beginCheckpoint(IgniteInternalFuture)} method call. * @param buf Temporary buffer to write changes into. * @param pageWriter Checkpoint page write context. * @param tracker Checkpoint metrics tracker. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index e2bb433..fcc6efe 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -634,11 +634,12 @@ public class PageMemoryImpl implements PageMemoryEx { } finally { seg.writeLock().unlock(); - - if (delayedWriter != null) - delayedWriter.finishReplacement(); } + //Finish replacement only when an exception wasn't thrown otherwise it possible to corrupt B+Tree. + if (delayedWriter != null) + delayedWriter.finishReplacement(); + //we have allocated 'tracking' page, we need to allocate regular one return isTrackingPage ? allocatePage(grpId, partId, flags) : pageId; } @@ -933,10 +934,10 @@ public class PageMemoryImpl implements PageMemoryEx { if (rmv) seg.loadedPages.remove(grpId, PageIdUtils.effectivePageId(pageId)); - Collection<FullPageId> cpPages = seg.segCheckpointPages; + CheckpointPages cpPages = seg.checkpointPages; if (cpPages != null) - cpPages.remove(new FullPageId(pageId, grpId)); + cpPages.markAsSaved(new FullPageId(pageId, grpId)); Collection<FullPageId> dirtyPages = seg.dirtyPages; @@ -1125,7 +1126,9 @@ public class PageMemoryImpl implements PageMemoryEx { } /** {@inheritDoc} */ - @Override public GridMultiCollectionWrapper<FullPageId> beginCheckpoint() throws IgniteException { + @Override public GridMultiCollectionWrapper<FullPageId> beginCheckpoint( + IgniteInternalFuture allowToReplace + ) throws IgniteException { if (segments == null) return new GridMultiCollectionWrapper<>(Collections.<FullPageId>emptyList()); @@ -1134,10 +1137,13 @@ public class PageMemoryImpl implements PageMemoryEx { for (int i = 0; i < segments.length; i++) { Segment seg = segments[i]; - if (seg.segCheckpointPages != null) + if (seg.checkpointPages != null) throw new IgniteException("Failed to begin checkpoint (it is already in progress)."); - collections[i] = seg.segCheckpointPages = seg.dirtyPages; + Collection<FullPageId> dirtyPages = seg.dirtyPages; + collections[i] = dirtyPages; + + seg.checkpointPages = new CheckpointPages(dirtyPages, allowToReplace); seg.dirtyPages = new GridConcurrentHashSet<>(); } @@ -1163,7 +1169,7 @@ public class PageMemoryImpl implements PageMemoryEx { return; for (Segment seg : segments) - seg.segCheckpointPages = null; + seg.checkpointPages = null; if (throttlingPlc != ThrottlingPolicy.DISABLED) writeThrottle.onFinishCheckpoint(); @@ -1203,7 +1209,7 @@ public class PageMemoryImpl implements PageMemoryEx { if (relPtr != OUTDATED_REL_PTR) { absPtr = seg.absolute(relPtr); - // Pin the page until page will not be copied. + // Pin the page until page will not be copied. This helpful to prevent page replacement of this page. if (PageHeader.tempBufferPointer(absPtr) == INVALID_REL_PTR) PageHeader.acquirePage(absPtr); else @@ -1754,7 +1760,7 @@ public class PageMemoryImpl implements PageMemoryEx { boolean isInCheckpoint(FullPageId pageId) { Segment seg = segment(pageId.groupId(), pageId.pageId()); - Collection<FullPageId> pages0 = seg.segCheckpointPages; + CheckpointPages pages0 = seg.checkpointPages; return pages0 != null && pages0.contains(pageId); } @@ -1766,11 +1772,11 @@ public class PageMemoryImpl implements PageMemoryEx { boolean clearCheckpoint(FullPageId fullPageId) { Segment seg = segment(fullPageId.groupId(), fullPageId.pageId()); - Collection<FullPageId> pages0 = seg.segCheckpointPages; + CheckpointPages pages0 = seg.checkpointPages; assert pages0 != null; - return pages0.remove(fullPageId); + return pages0.markAsSaved(fullPageId); } /** @@ -2106,8 +2112,8 @@ public class PageMemoryImpl implements PageMemoryEx { /** Pages marked as dirty since the last checkpoint. */ private volatile Collection<FullPageId> dirtyPages = new GridConcurrentHashSet<>(); - /** */ - private volatile Collection<FullPageId> segCheckpointPages; + /** Wrapper of pages of current checkpoint. */ + private volatile CheckpointPages checkpointPages; /** */ private final int maxDirtyPages; @@ -2256,14 +2262,13 @@ public class PageMemoryImpl implements PageMemoryEx { if (PageHeader.isAcquired(absPtr)) return false; - Collection<FullPageId> cpPages = segCheckpointPages; - clearRowCache(fullPageId, absPtr); if (isDirty(absPtr)) { + CheckpointPages checkpointPages = this.checkpointPages; // Can evict a dirty page only if should be written by a checkpoint. // These pages does not have tmp buffer. - if (cpPages != null && cpPages.contains(fullPageId)) { + if (checkpointPages != null && checkpointPages.allowToSave(fullPageId)) { assert storeMgr != null; memMetrics.updatePageReplaceRate(U.currentTimeMillis() - PageHeader.readTimestamp(absPtr)); @@ -2279,7 +2284,7 @@ public class PageMemoryImpl implements PageMemoryEx { setDirty(fullPageId, absPtr, false, true); - cpPages.remove(fullPageId); + checkpointPages.markAsSaved(fullPageId); return true; } @@ -2566,7 +2571,7 @@ public class PageMemoryImpl implements PageMemoryEx { ", loaded=" + loadedPages.size() + ", maxDirtyPages=" + maxDirtyPages + ", dirtyPages=" + dirtyPages.size() + - ", cpPages=" + (segCheckpointPages == null ? 0 : segCheckpointPages.size()) + + ", cpPages=" + (checkpointPages == null ? 0 : checkpointPages.size()) + ", pinnedInSegment=" + pinnedCnt + ", failedToPrepare=" + failToPrepare + ']' + U.nl() + "Out of memory in data region [" + diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java index 923cef4..0a5ec6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java @@ -563,8 +563,7 @@ public class StripedExecutor implements ExecutorService { break; } catch (Throwable e) { - if (e instanceof OutOfMemoryError) - errHnd.apply(e); + errHnd.apply(e); U.error(log, "Failed to execute runnable.", e); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java index 0cd040e..cc05658 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.MvccFeatureChecker; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -316,7 +317,7 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract } } - Collection<FullPageId> pageIds = mem.beginCheckpoint(); + Collection<FullPageId> pageIds = mem.beginCheckpoint(new GridFinishedFuture()); info("Acquired pages for checkpoint: " + pageIds.size()); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFailBeforeWriteMarkTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFailBeforeWriteMarkTest.java new file mode 100644 index 0000000..2daac36 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/CheckpointFailBeforeWriteMarkTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.checkpoint; + +import java.io.File; +import java.io.IOException; +import java.nio.file.OpenOption; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.StopNodeFailureHandler; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** + * + */ +public class CheckpointFailBeforeWriteMarkTest extends GridCommonAbstractTest { + /** */ + private InterceptorIOFactory interceptorIOFactory = new InterceptorIOFactory(); + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + DataStorageConfiguration storageCfg = new DataStorageConfiguration(); + + storageCfg.setCheckpointThreads(2) + .setFileIOFactory(interceptorIOFactory) + .setWalSegmentSize(5 * 1024 * 1024) + .setWalSegments(3); + + storageCfg.getDefaultDataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(10L * 1024 * 1024); + + cfg.setDataStorageConfiguration(storageCfg) + .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setAffinity(new RendezvousAffinityFunction(false, 16))); + + cfg.setFailureHandler(new StopNodeFailureHandler()); + + return cfg; + } + + /** + * Test IO factory which given opportunity to throw IO exception by custom predicate. + */ + private static class InterceptorIOFactory extends AsyncFileIOFactory { + /** */ + private static final Predicate<File> DUMMY_PREDICATE = (f) -> false; + + /** Time to wait before exception would be thrown. It is giving time to page replacer to work. */ + private static final long DELAY_TIME = 1000; + + /** Predicate which is a trigger of throwing an exception. */ + transient volatile Predicate<File> failPredicate = DUMMY_PREDICATE; + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + if(file.getName().contains("START.bin")) + sleep(); + + if (failPredicate.test(file)) { + failPredicate = DUMMY_PREDICATE; + + throw new IOException("Triggered test exception"); + } + + return super.create(file, modes); + } + + /** **/ + private void sleep() { + try { + Thread.sleep(DELAY_TIME); + } + catch (InterruptedException ignore) { + } + } + + /** + * Triggering exception by custom predicate. + * + * @param failPredicate Predicate for exception. + */ + public void triggerIOException(Predicate<File> failPredicate) { + this.failPredicate = failPredicate; + } + } + + /** + * @throws Exception if fail. + */ + @Test + public void testCheckpointFailBeforeMarkEntityWrite() throws Exception { + //given: one node with persistence. + IgniteEx ignite0 = startGrid(0); + + ignite0.cluster().active(true); + + //It is necessary to understanding when page replacement would be started. + PageMemory pageMemory = ignite0.context().cache().context().database().dataRegion("default").pageMemory(); + + //when: Load a lot of data to cluster. + AtomicInteger lastKey = new AtomicInteger(); + GridTestUtils.runMultiThreadedAsync(() -> { + IgniteCache<Integer, Object> cache2 = ignite(0).cache(DEFAULT_CACHE_NAME); + + //Should stopped putting data when node is fail. + for (int i = 0; i < Integer.MAX_VALUE; i++) { + cache2.put(i, i); + + lastKey.set(i); + + if (i % 1000 == 0) + log.info("WRITE : " + i); + } + }, 3, "LOAD-DATA"); + + //and: Page replacement was started. + assertTrue(waitForCondition(() -> U.field(pageMemory, "pageReplacementWarned"), 20_000)); + + //and: Node was failed during checkpoint after write lock was released and before checkpoint marker was stored to disk. + interceptorIOFactory.triggerIOException((file) -> file.getName().contains("START.bin")); + + log.info("KILL NODE await to stop"); + + assertTrue(waitForCondition(() -> G.allGrids().isEmpty(), 20_000)); + + //then: Data recovery after node start should be successful. + ignite0 = startGrid(0); + + ignite0.cluster().active(true); + + IgniteCache<Integer, Object> cache = ignite(0).cache(DEFAULT_CACHE_NAME); + + //WAL mode is 'default' so it is allowable to lost some last data(ex. last 100). + for(int i = 0; i < lastKey.get() - 100; i++) + assertNotNull(cache.get(i)); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java index 30de694..8d98026 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java @@ -77,6 +77,7 @@ import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemor import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridFilteredClosableIterator; import org.apache.ignite.internal.util.typedef.F; @@ -614,7 +615,7 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom ig.context().cache().context().database().checkpointReadUnlock(); } - Collection<FullPageId> cpPages = mem.beginCheckpoint(); + Collection<FullPageId> cpPages = mem.beginCheckpoint(new GridFinishedFuture()); ig.context().cache().context().database().checkpointReadLock(); @@ -944,7 +945,7 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom try { snapshot = new HashMap<>(resMap); - pageIds = mem.beginCheckpoint(); + pageIds = mem.beginCheckpoint(new GridFinishedFuture()); checkpoints--; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java index c019ecc..703d359 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter; import org.apache.ignite.internal.processors.metric.GridMetricManager; import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.internal.util.GridMultiCollectionWrapper; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.logger.NullLogger; import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi; @@ -117,7 +118,7 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest { memory.releasePage(1, pageId, ptr); } - GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint(); + GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint(new GridFinishedFuture()); int cpPages = ids.size(); log.info("Started CP with [" + cpPages + "] pages in it, created [" + markDirty + "] pages"); @@ -180,7 +181,7 @@ public class IgnitePageMemReplaceDelayedWriteUnitTest { memory.releasePage(1, pageId, ptr); } - GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint(); + GridMultiCollectionWrapper<FullPageId> ids = memory.beginCheckpoint(new GridFinishedFuture()); int cpPages = ids.size(); log.info("Started CP with [" + cpPages + "] pages in it, created [" + markDirty + "] pages"); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java index 9f38348..7c0a85d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.metric.GridMetricManager; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; import org.apache.ignite.internal.util.GridMultiCollectionWrapper; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridInClosure3X; import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi; @@ -123,7 +124,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { //Success } - memory.beginCheckpoint(); + memory.beginCheckpoint(new GridFinishedFuture()); final AtomicReference<FullPageId> lastPage = new AtomicReference<>(); @@ -208,14 +209,14 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { writePage(memory, fullId, (byte)1); } - doCheckpoint(memory.beginCheckpoint(), memory, pageStoreMgr); + doCheckpoint(memory.beginCheckpoint(new GridFinishedFuture()), memory, pageStoreMgr); FullPageId cowPageId = allocated.get(0); // Mark some pages as dirty. writePage(memory, cowPageId, (byte)2); - GridMultiCollectionWrapper<FullPageId> cpPages = memory.beginCheckpoint(); + GridMultiCollectionWrapper<FullPageId> cpPages = memory.beginCheckpoint(new GridFinishedFuture()); assertEquals(1, cpPages.size()); @@ -286,7 +287,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { } // CP Write lock. - memory.beginCheckpoint(); + memory.beginCheckpoint(new GridFinishedFuture()); // CP Write unlock. byte[] buf = new byte[PAGE_SIZE]; @@ -361,7 +362,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { acquireAndReleaseWriteLock(memory, fullPageId); } - memory.beginCheckpoint(); + memory.beginCheckpoint(new GridFinishedFuture()); CheckpointMetricsTracker mockTracker = Mockito.mock(CheckpointMetricsTracker.class); @@ -383,7 +384,7 @@ public class PageMemoryImplTest extends GridCommonAbstractTest { acquireAndReleaseWriteLock(memory, fullPageId); } - memory.beginCheckpoint(); + memory.beginCheckpoint(new GridFinishedFuture()); Collections.shuffle(pages); // Mix pages in checkpoint with clean pages diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index d6ac566..d263dc0 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReser import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest; import org.apache.ignite.internal.processors.cache.persistence.db.IgniteShutdownOnSupplyMessageFailureTest; import org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest; +import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointFailBeforeWriteMarkTest; import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.CheckpointFreeListTest; import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.IgniteCheckpointDirtyPagesForLowLoadTest; import org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest; @@ -212,6 +213,8 @@ public class IgnitePdsTestSuite2 { GridTestUtils.addTestIfNeeded(suite, CheckpointFreeListTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CheckpointFailBeforeWriteMarkTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteWalIteratorSwitchSegmentTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteWalIteratorExceptionDuringReadTest.class, ignoredTests);