Repository: ignite Updated Branches: refs/heads/ignite-6083 cbf65cb44 -> 6e92fffca
IGNITE-7933 Checkpoing file markers should be written atomically - Fixes #3633. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a0695ce Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a0695ce Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a0695ce Branch: refs/heads/ignite-6083 Commit: 4a0695ceae2f99c4841e8382e723daff4580ea3d Parents: a064702 Author: Pavel Kovalenko <jokse...@gmail.com> Authored: Fri Apr 6 10:35:17 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Fri Apr 6 10:35:17 2018 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/IgnitionEx.java | 2 +- .../GridCacheDatabaseSharedManager.java | 145 ++++++++---- .../cache/persistence/file/AsyncFileIO.java | 9 +- .../cache/persistence/file/FileIO.java | 20 +- .../cache/persistence/file/FileIODecorator.java | 9 +- .../persistence/file/RandomAccessFileIO.java | 13 +- .../cache/persistence/file/UnzipFileIO.java | 7 +- .../file/IgnitePdsDiskErrorsRecoveringTest.java | 231 +++++++++++++++---- .../db/wal/IgniteWalFlushFailoverTest.java | 4 +- ...lFlushMultiNodeFailoverAbstractSelfTest.java | 4 +- .../pagemem/PagesWriteThrottleSmokeTest.java | 4 +- .../file/AlignedBuffersDirectFileIO.java | 7 +- 12 files changed, 353 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 8073faa..4708dd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -410,7 +410,7 @@ public class IgnitionEx { " milliseconds. Killing node..."); // We are not able to kill only one grid so whole JVM will be stopped. - System.exit(Ignition.KILL_EXIT_CODE); + Runtime.getRuntime().halt(Ignition.KILL_EXIT_CODE); } } }, timeoutMs, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 71f3baa..70fc688 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -27,6 +27,7 @@ import java.nio.ByteOrder; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; +import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -116,8 +117,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.persistence.file.PersistentStorageIOException; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener; import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker; @@ -211,11 +214,14 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Checkpoint file name pattern. */ private static final Pattern CP_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin"); + /** Checkpoint file temporary suffix. This is needed to safe writing checkpoint markers through temporary file and renaming. */ + public static final String FILE_TMP_SUFFIX = ".tmp"; + /** Node started file patter. */ private static final Pattern NODE_STARTED_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-node-started\\.bin"); /** Node started file suffix. */ - private static final String NODE_STARTED_FILE_NAME_SUFFIX = "-node-started.bin"; + public static final String NODE_STARTED_FILE_NAME_SUFFIX = "-node-started.bin"; /** */ private static final FileFilter CP_FILE_FILTER = new FileFilter() { @@ -378,6 +384,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Initially disabled cache groups. */ private Collection<Integer> initiallyWalDisabledGrps; + /** File I/O factory for writing checkpoint markers. */ + private final FileIOFactory ioFactory; + /** * @param ctx Kernal context. */ @@ -402,6 +411,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan maxCpHistMemSize = Math.min(persistenceCfg.getWalHistorySize(), IgniteSystemProperties.getInteger(IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, 100)); + + ioFactory = persistenceCfg.getFileIOFactory(); } /** */ @@ -494,6 +505,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (!U.mkdirs(cpDir)) throw new IgniteCheckedException("Could not create directory for checkpoint metadata: " + cpDir); + cleanupCheckpointDirectory(); + final FileLockHolder preLocked = kernalCtx.pdsFolderResolver() .resolveFolders() .getLockedFileLockHolder(); @@ -508,6 +521,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** + * Cleanup checkpoint directory from all temporary files {@link #FILE_TMP_SUFFIX}. + */ + private void cleanupCheckpointDirectory() throws IgniteCheckedException { + try { + try (DirectoryStream<Path> files = Files.newDirectoryStream(cpDir.toPath(), new DirectoryStream.Filter<Path>() { + @Override + public boolean accept(Path path) throws IOException { + return path.endsWith(FILE_TMP_SUFFIX); + } + })) { + for (Path path : files) + Files.delete(path); + } + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to cleanup checkpoint directory: " + cpDir, e); + } + } + + /** * */ private void initDataBase() { @@ -749,7 +782,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan notifyMetastorageReadyForReadWrite(); } - catch (StorageException e) { + catch (StorageException | PersistentStorageIOException e) { cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); throw new IgniteCheckedException(e); @@ -760,41 +793,52 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** + * Creates file with current timestamp and specific "node-started.bin" suffix + * and writes into memory recovery pointer. + * * @param ptr Memory recovery wal pointer. */ private void nodeStart(WALPointer ptr) throws IgniteCheckedException { FileWALPointer p = (FileWALPointer)ptr; - String fileName = U.currentTimeMillis() + "-node-started.bin"; + String fileName = U.currentTimeMillis() + NODE_STARTED_FILE_NAME_SUFFIX; + String tmpFileName = fileName + FILE_TMP_SUFFIX; ByteBuffer buf = ByteBuffer.allocate(20); buf.order(ByteOrder.nativeOrder()); - try (FileChannel ch = FileChannel.open( - Paths.get(cpDir.getAbsolutePath(), fileName), - StandardOpenOption.CREATE_NEW, StandardOpenOption.APPEND) - ) { - buf.putLong(p.index()); + try { + try (FileIO io = ioFactory.create(Paths.get(cpDir.getAbsolutePath(), tmpFileName).toFile(), + StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) { + buf.putLong(p.index()); - buf.putInt(p.fileOffset()); + buf.putInt(p.fileOffset()); - buf.putInt(p.length()); + buf.putInt(p.length()); - buf.flip(); + buf.flip(); - ch.write(buf); + io.write(buf); - buf.clear(); + buf.clear(); + + io.force(true); + } - ch.force(true); + Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), Paths.get(cpDir.getAbsolutePath(), fileName)); } catch (IOException e) { - throw new IgniteCheckedException(e); + throw new PersistentStorageIOException("Failed to write node start marker: " + ptr, e); } } /** + * Collects memory recovery pointers from node started files. See {@link #nodeStart(WALPointer)}. + * Each pointer associated with timestamp extracted from file. + * Tuples are sorted by timestamp. * + * @return Sorted list of tuples (node started timestamp, memory recovery pointer). + * @throws IgniteCheckedException */ public List<T2<Long, WALPointer>> nodeStartedPointers() throws IgniteCheckedException { List<T2<Long, WALPointer>> res = new ArrayList<>(); @@ -806,15 +850,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan String n1 = o1.getName(); String n2 = o2.getName(); - Long ts1 = Long.valueOf(n1.substring(0, n1.length() - NODE_STARTED_FILE_NAME_SUFFIX.length())); - Long ts2 = Long.valueOf(n2.substring(0, n2.length() - NODE_STARTED_FILE_NAME_SUFFIX.length())); + long ts1 = Long.valueOf(n1.substring(0, n1.length() - NODE_STARTED_FILE_NAME_SUFFIX.length())); + long ts2 = Long.valueOf(n2.substring(0, n2.length() - NODE_STARTED_FILE_NAME_SUFFIX.length())); - if (ts1 == ts2) - return 0; - else if (ts1 < ts2) - return -1; - else - return 1; + return Long.compare(ts1, ts2); } }); @@ -826,8 +865,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan Long ts = Long.valueOf(name.substring(0, name.length() - NODE_STARTED_FILE_NAME_SUFFIX.length())); - try (FileChannel ch = FileChannel.open(f.toPath(), READ)) { - ch.read(buf); + try (FileIO io = ioFactory.create(f, READ)) { + io.read(buf); buf.flip(); @@ -1869,8 +1908,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private WALPointer readPointer(File cpMarkerFile, ByteBuffer buf) throws IgniteCheckedException { buf.position(0); - try (FileChannel ch = FileChannel.open(cpMarkerFile.toPath(), READ)) { - ch.read(buf); + try (FileIO io = ioFactory.create(cpMarkerFile, READ)) { + io.read(buf); buf.flip(); @@ -2584,6 +2623,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** + * Writes into specified file checkpoint entry containing WAL pointer to checkpoint record. + * * @param cpId Checkpoint ID. * @param ptr Wal pointer of current checkpoint. */ @@ -2600,31 +2641,40 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan FileWALPointer filePtr = (FileWALPointer)ptr; String fileName = checkpointFileName(cpTs, cpId, type); + String tmpFileName = fileName + FILE_TMP_SUFFIX; - try (FileChannel ch = FileChannel.open(Paths.get(cpDir.getAbsolutePath(), fileName), - StandardOpenOption.CREATE_NEW, StandardOpenOption.APPEND)) { + try { + try (FileIO io = ioFactory.create(Paths.get(cpDir.getAbsolutePath(), skipSync ? fileName : tmpFileName).toFile(), + StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) { - tmpWriteBuf.rewind(); + tmpWriteBuf.rewind(); - tmpWriteBuf.putLong(filePtr.index()); + tmpWriteBuf.putLong(filePtr.index()); - tmpWriteBuf.putInt(filePtr.fileOffset()); + tmpWriteBuf.putInt(filePtr.fileOffset()); - tmpWriteBuf.putInt(filePtr.length()); + tmpWriteBuf.putInt(filePtr.length()); - tmpWriteBuf.flip(); + tmpWriteBuf.flip(); - ch.write(tmpWriteBuf); + io.write(tmpWriteBuf); - tmpWriteBuf.clear(); + tmpWriteBuf.clear(); + + if (!skipSync) + io.force(true); + } if (!skipSync) - ch.force(true); + Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), Paths.get(cpDir.getAbsolutePath(), fileName)); return createCheckPointEntry(cpTs, ptr, cpId, rec, type); } catch (IOException e) { - throw new IgniteCheckedException(e); + throw new PersistentStorageIOException("Failed to write checkpoint entry [ptr=" + filePtr + + ", cpTs=" + cpTs + + ", cpId=" + cpId + + ", type=" + type + "]", e); } } @@ -2691,8 +2741,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (type != CheckpointEntryType.START) return null; - CheckpointEntry entry; - Map<Integer, CacheState> cacheGrpStates = null; // Create lazy checkpoint entry. @@ -2827,7 +2875,20 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan try { CheckpointMetricsTracker tracker = new CheckpointMetricsTracker(); - Checkpoint chp = markCheckpointBegin(tracker); + Checkpoint chp; + + try { + chp = markCheckpointBegin(tracker); + } + catch (IgniteCheckedException e) { + if (curCpProgress != null) + curCpProgress.cpFinishFut.onDone(e); + + // In case of checkpoint initialization error node should be invalidated and stopped. + cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + + return; + } currCheckpointPagesCnt = chp.pagesSize; @@ -2885,7 +2946,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } catch (IgniteCheckedException e) { chp.progress.cpFinishFut.onDone(e); - // In case of writing error node should be invalidated and stopped. + // In case of checkpoint writing error node should be invalidated and stopped. cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); return; http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java index b1db79d..799a78c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java @@ -169,13 +169,18 @@ public class AsyncFileIO implements FileIO { } /** {@inheritDoc} */ - @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { + @Override public MappedByteBuffer map(int sizeBytes) throws IOException { throw new UnsupportedOperationException("AsynchronousFileChannel doesn't support mmap."); } /** {@inheritDoc} */ @Override public void force() throws IOException { - ch.force(false); + force(false); + } + + /** {@inheritDoc} */ + @Override public void force(boolean withMetadata) throws IOException { + ch.force(withMetadata); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java index 73e44b0..822bd66 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java @@ -124,7 +124,16 @@ public interface FileIO extends AutoCloseable { */ public void write(byte[] buf, int off, int len) throws IOException; - public MappedByteBuffer map(int maxWalSegmentSize) throws IOException; + /** + * Allocates memory mapped buffer for this file with given size. + * + * @param sizeBytes Size of buffer. + * + * @return Instance of mapped byte buffer. + * + * @throws IOException If some I/O error occurs. + */ + public MappedByteBuffer map(int sizeBytes) throws IOException; /** * Forces any updates of this file to be written to the storage @@ -135,6 +144,15 @@ public interface FileIO extends AutoCloseable { public void force() throws IOException; /** + * Forces any updates of this file to be written to the storage + * device that contains it. + * + * @param withMetadata If {@code true} force also file metadata. + * @throws IOException If some I/O error occurs. + */ + public void force(boolean withMetadata) throws IOException; + + /** * Returns current file size in bytes. * * @return File size. http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java index dd563f2..683845b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java @@ -77,8 +77,8 @@ public class FileIODecorator implements FileIO { } /** {@inheritDoc} */ - @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { - return delegate.map(maxWalSegmentSize); + @Override public MappedByteBuffer map(int sizeBytes) throws IOException { + return delegate.map(sizeBytes); } /** {@inheritDoc} */ @@ -87,6 +87,11 @@ public class FileIODecorator implements FileIO { } /** {@inheritDoc} */ + @Override public void force(boolean withMetadata) throws IOException { + delegate.force(withMetadata); + } + + /** {@inheritDoc} */ @Override public long size() throws IOException { return delegate.size(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java index 23d6ebf..8f7454d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java @@ -84,8 +84,8 @@ public class RandomAccessFileIO implements FileIO { } /** {@inheritDoc} */ - @Override public void force() throws IOException { - ch.force(false); + @Override public void force(boolean withMetadata) throws IOException { + ch.force(withMetadata); } /** {@inheritDoc} */ @@ -104,7 +104,12 @@ public class RandomAccessFileIO implements FileIO { } /** {@inheritDoc} */ - @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { - return ch.map(FileChannel.MapMode.READ_WRITE, 0, maxWalSegmentSize); + @Override public MappedByteBuffer map(int sizeBytes) throws IOException { + return ch.map(FileChannel.MapMode.READ_WRITE, 0, sizeBytes); + } + + /** {@inheritDoc} */ + @Override public void force() throws IOException { + force(false); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java index 83ff91b..469cf3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java @@ -116,6 +116,11 @@ public class UnzipFileIO implements FileIO { /** {@inheritDoc} */ @Override public void force() throws IOException { + force(false); + } + + /** {@inheritDoc} */ + @Override public void force(boolean withMetadata) throws IOException { throw new UnsupportedOperationException(); } @@ -130,7 +135,7 @@ public class UnzipFileIO implements FileIO { } /** {@inheritDoc} */ - @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { + @Override public MappedByteBuffer map(int sizeBytes) throws IOException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java index 3e85c77..c902879 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java @@ -25,6 +25,7 @@ import java.nio.file.OpenOption; import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; @@ -39,15 +40,20 @@ import org.apache.ignite.failure.StopNodeFailureHandler; import org.apache.ignite.internal.GridKernalState; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP; /** @@ -61,19 +67,13 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest { private static final int WAL_SEGMENT_SIZE = 1024 * PAGE_SIZE; /** */ - private static final long DFLT_DISK_SPACE_BYTES = Long.MAX_VALUE; - - /** */ private static final long STOP_TIMEOUT_MS = 30 * 1000; /** */ private static final String CACHE_NAME = "cache"; - /** */ - private boolean failPageStoreDiskOperations = false; - - /** */ - private long diskSpaceBytes = DFLT_DISK_SPACE_BYTES; + /** Specified i/o factory for particular test. */ + private FileIOFactory ioFactory; /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { @@ -88,8 +88,7 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest { cleanPersistenceDir(); - failPageStoreDiskOperations = false; - diskSpaceBytes = DFLT_DISK_SPACE_BYTES; + ioFactory = null; System.clearProperty(IGNITE_WAL_MMAP); } @@ -103,10 +102,11 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest { .setWalMode(WALMode.LOG_ONLY) .setWalCompactionEnabled(false) .setWalSegmentSize(WAL_SEGMENT_SIZE) + .setCheckpointFrequency(240 * 60 * 1000) .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4); - if (failPageStoreDiskOperations) - dsCfg.setFileIOFactory(new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), diskSpaceBytes)); + if (ioFactory != null) + dsCfg.setFileIOFactory(ioFactory); cfg.setDataStorageConfiguration(dsCfg); @@ -122,19 +122,17 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest { } /** - * + * Test node stopping & recovering on cache initialization fail. */ - public void testRecoveringOnCacheInitError() throws Exception { - failPageStoreDiskOperations = true; - - // Two pages is enough to initialize MetaStorage. - diskSpaceBytes = 2 * PAGE_SIZE; + public void testRecoveringOnCacheInitFail() throws Exception { + // Fail to initialize page store. 2 extra pages is needed for MetaStorage. + ioFactory = new FilteringFileIOFactory(".bin", new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 2 * PAGE_SIZE)); final IgniteEx grid = startGrid(0); boolean failed = false; try { - grid.active(true); + grid.cluster().active(true); } catch (Exception expected) { log.warning("Expected cache error", expected); @@ -147,21 +145,128 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest { awaitStop(grid); // Grid should be successfully recovered after stopping. - failPageStoreDiskOperations = false; + ioFactory = null; IgniteEx recoveredGrid = startGrid(0); recoveredGrid.active(true); } /** + * Test node stopping & recovering on start marker writing fail during activation. * + * @throws Exception If test failed. */ - public void testRecoveringOnCheckpointWritingError() throws Exception { - failPageStoreDiskOperations = true; - diskSpaceBytes = 1024 * PAGE_SIZE; + public void testRecoveringOnNodeStartMarkerWriteFail() throws Exception { + // Fail to write node start marker tmp file at the second checkpoint. Pass only initial checkpoint. + ioFactory = new FilteringFileIOFactory("started.bin" + GridCacheDatabaseSharedManager.FILE_TMP_SUFFIX, new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 20)); + + IgniteEx grid = startGrid(0); + grid.cluster().active(true); + + for (int i = 0; i < 1000; i++) { + byte payload = (byte) i; + byte[] data = new byte[2048]; + Arrays.fill(data, payload); + + grid.cache(CACHE_NAME).put(i, data); + } + + stopAllGrids(); + + boolean activationFailed = false; + try { + grid = startGrid(0); + grid.cluster().active(true); + } + catch (IgniteException e) { + log.warning("Activation test exception", e); + + activationFailed = true; + } + + Assert.assertTrue("Activation must be failed", activationFailed); + + // Grid should be automatically stopped after checkpoint fail. + awaitStop(grid); + + // Grid should be successfully recovered after stopping. + ioFactory = null; + + IgniteEx recoveredGrid = startGrid(0); + recoveredGrid.cluster().active(true); + + for (int i = 0; i < 1000; i++) { + byte payload = (byte) i; + byte[] data = new byte[2048]; + Arrays.fill(data, payload); + + byte[] actualData = (byte[]) recoveredGrid.cache(CACHE_NAME).get(i); + Assert.assertArrayEquals(data, actualData); + } + } + + + /** + * Test node stopping & recovering on checkpoint begin fail. + * + * @throws Exception If test failed. + */ + public void testRecoveringOnCheckpointBeginFail() throws Exception { + // Fail to write checkpoint start marker tmp file at the second checkpoint. Pass only initial checkpoint. + ioFactory = new FilteringFileIOFactory("START.bin" + GridCacheDatabaseSharedManager.FILE_TMP_SUFFIX, new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 20)); final IgniteEx grid = startGrid(0); - grid.active(true); + grid.cluster().active(true); + + for (int i = 0; i < 1000; i++) { + byte payload = (byte) i; + byte[] data = new byte[2048]; + Arrays.fill(data, payload); + + grid.cache(CACHE_NAME).put(i, data); + } + + String errMsg = "Failed to write checkpoint entry"; + + boolean checkpointFailed = false; + try { + forceCheckpoint(); + } + catch (IgniteCheckedException e) { + if (e.getMessage().contains(errMsg)) + checkpointFailed = true; + } + + Assert.assertTrue("Checkpoint must be failed by IgniteCheckedException: " + errMsg, checkpointFailed); + + // Grid should be automatically stopped after checkpoint fail. + awaitStop(grid); + + // Grid should be successfully recovered after stopping. + ioFactory = null; + + IgniteEx recoveredGrid = startGrid(0); + recoveredGrid.cluster().active(true); + + for (int i = 0; i < 1000; i++) { + byte payload = (byte) i; + byte[] data = new byte[2048]; + Arrays.fill(data, payload); + + byte[] actualData = (byte[]) recoveredGrid.cache(CACHE_NAME).get(i); + Assert.assertArrayEquals(data, actualData); + } + } + + /** + * Test node stopping & recovering on checkpoint pages write fail. + */ + public void testRecoveringOnCheckpointWriteFail() throws Exception { + // Fail write partition and index files at the second checkpoint. Pass only initial checkpoint. + ioFactory = new FilteringFileIOFactory(".bin", new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 128 * PAGE_SIZE)); + + final IgniteEx grid = startGrid(0); + grid.cluster().active(true); for (int i = 0; i < 1000; i++) { byte payload = (byte) i; @@ -187,10 +292,10 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest { awaitStop(grid); // Grid should be successfully recovered after stopping. - failPageStoreDiskOperations = false; + ioFactory = null; IgniteEx recoveredGrid = startGrid(0); - recoveredGrid.active(true); + recoveredGrid.cluster().active(true); for (int i = 0; i < 1000; i++) { byte payload = (byte) i; @@ -203,33 +308,35 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest { } /** - * + * Test node stopping & recovering on WAL writing fail with enabled MMAP (Batch allocation for WAL segments). */ - public void testRecoveringOnWALErrorWithMmap() throws Exception { - diskSpaceBytes = WAL_SEGMENT_SIZE; + public void testRecoveringOnWALWritingFail1() throws Exception { + // Allow to allocate only 1 wal segment, fail on write to second. + ioFactory = new FilteringFileIOFactory(".wal", new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), WAL_SEGMENT_SIZE)); System.setProperty(IGNITE_WAL_MMAP, "true"); - emulateRecoveringOnWALWritingError(); + doTestRecoveringOnWALWritingFail(); } /** - * + * Test node stopping & recovering on WAL writing fail with disabled MMAP. */ - public void testRecoveringOnWALErrorWithoutMmap() throws Exception { - diskSpaceBytes = 2 * WAL_SEGMENT_SIZE; + public void testRecoveringOnWALWritingFail2() throws Exception { + // Fail somewhere on the second wal segment. + ioFactory = new FilteringFileIOFactory(".wal", new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), (long) (1.5 * WAL_SEGMENT_SIZE))); System.setProperty(IGNITE_WAL_MMAP, "false"); - emulateRecoveringOnWALWritingError(); + doTestRecoveringOnWALWritingFail(); } /** - * + * Test node stopping & recovery on WAL writing fail. */ - private void emulateRecoveringOnWALWritingError() throws Exception { + private void doTestRecoveringOnWALWritingFail() throws Exception { final IgniteEx grid = startGrid(0); FileWriteAheadLogManager wal = (FileWriteAheadLogManager)grid.context().cache().context().wal(); - wal.setFileIOFactory(new LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), diskSpaceBytes)); + wal.setFileIOFactory(ioFactory); - grid.active(true); + grid.cluster().active(true); int failedPosition = -1; @@ -254,9 +361,11 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest { // Grid should be automatically stopped after WAL fail. awaitStop(grid); + ioFactory = null; + // Grid should be successfully recovered after stopping. IgniteEx recoveredGrid = startGrid(0); - recoveredGrid.active(true); + recoveredGrid.cluster().active(true); for (int i = 0; i < failedPosition; i++) { byte payload = (byte) i; @@ -328,11 +437,49 @@ public class IgnitePdsDiskErrorsRecoveringTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { - availableSpaceBytes.addAndGet(-maxWalSegmentSize); + @Override public MappedByteBuffer map(int sizeBytes) throws IOException { + availableSpaceBytes.addAndGet(-sizeBytes); if (availableSpaceBytes.get() < 0) throw new IOException("Not enough space!"); - return super.map(maxWalSegmentSize); + return super.map(sizeBytes); + } + } + + /** + * Factory to provide custom File I/O interfaces only for files with specified suffix. + * For other files {@link RandomAccessFileIO} will be used. + */ + private static class FilteringFileIOFactory implements FileIOFactory { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Delegate. */ + private final FileIOFactory delegate; + + /** File suffix pattern. */ + private final String pattern; + + /** + * Constructor. + * + * @param pattern File suffix pattern. + * @param delegate I/O Factory delegate. + */ + FilteringFileIOFactory(String pattern, FileIOFactory delegate) { + this.delegate = delegate; + this.pattern = pattern; + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + return create(file, CREATE, WRITE, READ); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + if (file.getName().endsWith(pattern)) + return delegate.create(file, modes); + return new RandomAccessFileIO(file, modes); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java index 946b4e8..042a447 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java @@ -206,8 +206,8 @@ public class IgniteWalFlushFailoverTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { - return delegate.map(maxWalSegmentSize); + @Override public MappedByteBuffer map(int sizeBytes) throws IOException { + return delegate.map(sizeBytes); } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java index 1259c3c..fe16328 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java @@ -250,8 +250,8 @@ public abstract class IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr } /** {@inheritDoc} */ - @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { - return delegate.map(maxWalSegmentSize); + @Override public MappedByteBuffer map(int sizeBytes) throws IOException { + return delegate.map(sizeBytes); } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java index 9f1342f..249718b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java @@ -319,8 +319,8 @@ public class PagesWriteThrottleSmokeTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { - return delegate.map(maxWalSegmentSize); + @Override public MappedByteBuffer map(int sizeBytes) throws IOException { + return delegate.map(sizeBytes); } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java ---------------------------------------------------------------------- diff --git a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java index 3cb4886..681426c 100644 --- a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java +++ b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java @@ -460,12 +460,17 @@ public class AlignedBuffersDirectFileIO implements FileIO { } /** {@inheritDoc} */ - @Override public MappedByteBuffer map(int maxWalSegmentSize) throws IOException { + @Override public MappedByteBuffer map(int sizeBytes) throws IOException { throw new UnsupportedOperationException("AsynchronousFileChannel doesn't support mmap."); } /** {@inheritDoc} */ @Override public void force() throws IOException { + force(false); + } + + /** {@inheritDoc} */ + @Override public void force(boolean withMetadata) throws IOException { if (IgniteNativeIoLib.fsync(fdCheckOpened()) < 0) throw new IOException(String.format("Error fsync()'ing %s, got %s", file, getLastError())); }