Repository: ignite
Updated Branches:
  refs/heads/ignite-6083 cbf65cb44 -> 6e92fffca


IGNITE-7933 Checkpoing file markers should be written atomically - Fixes #3633.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a0695ce
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a0695ce
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a0695ce

Branch: refs/heads/ignite-6083
Commit: 4a0695ceae2f99c4841e8382e723daff4580ea3d
Parents: a064702
Author: Pavel Kovalenko <jokse...@gmail.com>
Authored: Fri Apr 6 10:35:17 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Fri Apr 6 10:35:17 2018 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  |   2 +-
 .../GridCacheDatabaseSharedManager.java         | 145 ++++++++----
 .../cache/persistence/file/AsyncFileIO.java     |   9 +-
 .../cache/persistence/file/FileIO.java          |  20 +-
 .../cache/persistence/file/FileIODecorator.java |   9 +-
 .../persistence/file/RandomAccessFileIO.java    |  13 +-
 .../cache/persistence/file/UnzipFileIO.java     |   7 +-
 .../file/IgnitePdsDiskErrorsRecoveringTest.java | 231 +++++++++++++++----
 .../db/wal/IgniteWalFlushFailoverTest.java      |   4 +-
 ...lFlushMultiNodeFailoverAbstractSelfTest.java |   4 +-
 .../pagemem/PagesWriteThrottleSmokeTest.java    |   4 +-
 .../file/AlignedBuffersDirectFileIO.java        |   7 +-
 12 files changed, 353 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 8073faa..4708dd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -410,7 +410,7 @@ public class IgnitionEx {
                             " milliseconds. Killing node...");
 
                     // We are not able to kill only one grid so whole JVM will 
be stopped.
-                    System.exit(Ignition.KILL_EXIT_CODE);
+                    Runtime.getRuntime().halt(Ignition.KILL_EXIT_CODE);
                 }
             }
         }, timeoutMs, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 71f3baa..70fc688 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -27,6 +27,7 @@ import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
+import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -116,8 +117,10 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.PersistentStorageIOException;
 import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
@@ -211,11 +214,14 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     /** Checkpoint file name pattern. */
     private static final Pattern CP_FILE_NAME_PATTERN = 
Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin");
 
+    /** Checkpoint file temporary suffix. This is needed to safe writing 
checkpoint markers through temporary file and renaming. */
+    public static final String FILE_TMP_SUFFIX = ".tmp";
+
     /** Node started file patter. */
     private static final Pattern NODE_STARTED_FILE_NAME_PATTERN = 
Pattern.compile("(\\d+)-node-started\\.bin");
 
     /** Node started file suffix. */
-    private static final String NODE_STARTED_FILE_NAME_SUFFIX = 
"-node-started.bin";
+    public static final String NODE_STARTED_FILE_NAME_SUFFIX = 
"-node-started.bin";
 
     /** */
     private static final FileFilter CP_FILE_FILTER = new FileFilter() {
@@ -378,6 +384,9 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     /** Initially disabled cache groups. */
     private Collection<Integer> initiallyWalDisabledGrps;
 
+    /** File I/O factory for writing checkpoint markers. */
+    private final FileIOFactory ioFactory;
+
     /**
      * @param ctx Kernal context.
      */
@@ -402,6 +411,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
         maxCpHistMemSize = Math.min(persistenceCfg.getWalHistorySize(),
             
IgniteSystemProperties.getInteger(IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE,
 100));
+
+        ioFactory = persistenceCfg.getFileIOFactory();
     }
 
     /** */
@@ -494,6 +505,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
             if (!U.mkdirs(cpDir))
                 throw new IgniteCheckedException("Could not create directory 
for checkpoint metadata: " + cpDir);
 
+            cleanupCheckpointDirectory();
+
             final FileLockHolder preLocked = kernalCtx.pdsFolderResolver()
                 .resolveFolders()
                 .getLockedFileLockHolder();
@@ -508,6 +521,26 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * Cleanup checkpoint directory from all temporary files {@link 
#FILE_TMP_SUFFIX}.
+     */
+    private void cleanupCheckpointDirectory() throws IgniteCheckedException {
+        try {
+            try (DirectoryStream<Path> files = 
Files.newDirectoryStream(cpDir.toPath(), new DirectoryStream.Filter<Path>() {
+                @Override
+                public boolean accept(Path path) throws IOException {
+                    return path.endsWith(FILE_TMP_SUFFIX);
+                }
+            })) {
+                for (Path path : files)
+                    Files.delete(path);
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to cleanup checkpoint 
directory: " + cpDir, e);
+        }
+    }
+
+    /**
      *
      */
     private void initDataBase() {
@@ -749,7 +782,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
             notifyMetastorageReadyForReadWrite();
         }
-        catch (StorageException e) {
+        catch (StorageException | PersistentStorageIOException e) {
             cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
 
             throw new IgniteCheckedException(e);
@@ -760,41 +793,52 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * Creates file with current timestamp and specific "node-started.bin" 
suffix
+     * and writes into memory recovery pointer.
+     *
      * @param ptr Memory recovery wal pointer.
      */
     private void nodeStart(WALPointer ptr) throws IgniteCheckedException {
         FileWALPointer p = (FileWALPointer)ptr;
 
-        String fileName = U.currentTimeMillis() + "-node-started.bin";
+        String fileName = U.currentTimeMillis() + 
NODE_STARTED_FILE_NAME_SUFFIX;
+        String tmpFileName = fileName + FILE_TMP_SUFFIX;
 
         ByteBuffer buf = ByteBuffer.allocate(20);
         buf.order(ByteOrder.nativeOrder());
 
-        try (FileChannel ch = FileChannel.open(
-            Paths.get(cpDir.getAbsolutePath(), fileName),
-            StandardOpenOption.CREATE_NEW, StandardOpenOption.APPEND)
-        ) {
-            buf.putLong(p.index());
+        try {
+            try (FileIO io = 
ioFactory.create(Paths.get(cpDir.getAbsolutePath(), tmpFileName).toFile(),
+                    StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
+                buf.putLong(p.index());
 
-            buf.putInt(p.fileOffset());
+                buf.putInt(p.fileOffset());
 
-            buf.putInt(p.length());
+                buf.putInt(p.length());
 
-            buf.flip();
+                buf.flip();
 
-            ch.write(buf);
+                io.write(buf);
 
-            buf.clear();
+                buf.clear();
+
+                io.force(true);
+            }
 
-            ch.force(true);
+            Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), 
Paths.get(cpDir.getAbsolutePath(), fileName));
         }
         catch (IOException e) {
-            throw new IgniteCheckedException(e);
+            throw new PersistentStorageIOException("Failed to write node start 
marker: " + ptr, e);
         }
     }
 
     /**
+     * Collects memory recovery pointers from node started files. See {@link 
#nodeStart(WALPointer)}.
+     * Each pointer associated with timestamp extracted from file.
+     * Tuples are sorted by timestamp.
      *
+     * @return Sorted list of tuples (node started timestamp, memory recovery 
pointer).
+     * @throws IgniteCheckedException
      */
     public List<T2<Long, WALPointer>> nodeStartedPointers() throws 
IgniteCheckedException {
         List<T2<Long, WALPointer>> res = new ArrayList<>();
@@ -806,15 +850,10 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 String n1 = o1.getName();
                 String n2 = o2.getName();
 
-                Long ts1 = Long.valueOf(n1.substring(0, n1.length() - 
NODE_STARTED_FILE_NAME_SUFFIX.length()));
-                Long ts2 = Long.valueOf(n2.substring(0, n2.length() - 
NODE_STARTED_FILE_NAME_SUFFIX.length()));
+                long ts1 = Long.valueOf(n1.substring(0, n1.length() - 
NODE_STARTED_FILE_NAME_SUFFIX.length()));
+                long ts2 = Long.valueOf(n2.substring(0, n2.length() - 
NODE_STARTED_FILE_NAME_SUFFIX.length()));
 
-                if (ts1 == ts2)
-                    return 0;
-                else if (ts1 < ts2)
-                    return -1;
-                else
-                    return 1;
+                return Long.compare(ts1, ts2);
             }
         });
 
@@ -826,8 +865,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
             Long ts = Long.valueOf(name.substring(0, name.length() - 
NODE_STARTED_FILE_NAME_SUFFIX.length()));
 
-            try (FileChannel ch = FileChannel.open(f.toPath(), READ)) {
-                ch.read(buf);
+            try (FileIO io = ioFactory.create(f, READ)) {
+                io.read(buf);
 
                 buf.flip();
 
@@ -1869,8 +1908,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     private WALPointer readPointer(File cpMarkerFile, ByteBuffer buf) throws 
IgniteCheckedException {
         buf.position(0);
 
-        try (FileChannel ch = FileChannel.open(cpMarkerFile.toPath(), READ)) {
-            ch.read(buf);
+        try (FileIO io = ioFactory.create(cpMarkerFile, READ)) {
+            io.read(buf);
 
             buf.flip();
 
@@ -2584,6 +2623,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * Writes into specified file checkpoint entry containing WAL pointer to 
checkpoint record.
+     *
      * @param cpId Checkpoint ID.
      * @param ptr Wal pointer of current checkpoint.
      */
@@ -2600,31 +2641,40 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         FileWALPointer filePtr = (FileWALPointer)ptr;
 
         String fileName = checkpointFileName(cpTs, cpId, type);
+        String tmpFileName = fileName + FILE_TMP_SUFFIX;
 
-        try (FileChannel ch = 
FileChannel.open(Paths.get(cpDir.getAbsolutePath(), fileName),
-            StandardOpenOption.CREATE_NEW, StandardOpenOption.APPEND)) {
+        try {
+            try (FileIO io = 
ioFactory.create(Paths.get(cpDir.getAbsolutePath(), skipSync ? fileName : 
tmpFileName).toFile(),
+                    StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
 
-            tmpWriteBuf.rewind();
+                tmpWriteBuf.rewind();
 
-            tmpWriteBuf.putLong(filePtr.index());
+                tmpWriteBuf.putLong(filePtr.index());
 
-            tmpWriteBuf.putInt(filePtr.fileOffset());
+                tmpWriteBuf.putInt(filePtr.fileOffset());
 
-            tmpWriteBuf.putInt(filePtr.length());
+                tmpWriteBuf.putInt(filePtr.length());
 
-            tmpWriteBuf.flip();
+                tmpWriteBuf.flip();
 
-            ch.write(tmpWriteBuf);
+                io.write(tmpWriteBuf);
 
-            tmpWriteBuf.clear();
+                tmpWriteBuf.clear();
+
+                if (!skipSync)
+                    io.force(true);
+            }
 
             if (!skipSync)
-                ch.force(true);
+                Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), 
Paths.get(cpDir.getAbsolutePath(), fileName));
 
             return createCheckPointEntry(cpTs, ptr, cpId, rec, type);
         }
         catch (IOException e) {
-            throw new IgniteCheckedException(e);
+            throw new PersistentStorageIOException("Failed to write checkpoint 
entry [ptr=" + filePtr
+                    + ", cpTs=" + cpTs
+                    + ", cpId=" + cpId
+                    + ", type=" + type + "]", e);
         }
     }
 
@@ -2691,8 +2741,6 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         if (type != CheckpointEntryType.START)
             return null;
 
-        CheckpointEntry entry;
-
         Map<Integer, CacheState> cacheGrpStates = null;
 
         // Create lazy checkpoint entry.
@@ -2827,7 +2875,20 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
             try {
                 CheckpointMetricsTracker tracker = new 
CheckpointMetricsTracker();
 
-                Checkpoint chp = markCheckpointBegin(tracker);
+                Checkpoint chp;
+
+                try {
+                    chp = markCheckpointBegin(tracker);
+                }
+                catch (IgniteCheckedException e) {
+                    if (curCpProgress != null)
+                        curCpProgress.cpFinishFut.onDone(e);
+
+                    // In case of checkpoint initialization error node should 
be invalidated and stopped.
+                    cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
+
+                    return;
+                }
 
                 currCheckpointPagesCnt = chp.pagesSize;
 
@@ -2885,7 +2946,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                         } catch (IgniteCheckedException e) {
                             chp.progress.cpFinishFut.onDone(e);
 
-                            // In case of writing error node should be 
invalidated and stopped.
+                            // In case of checkpoint writing error node should 
be invalidated and stopped.
                             cctx.kernalContext().failure().process(new 
FailureContext(FailureType.CRITICAL_ERROR, e));
 
                             return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
index b1db79d..799a78c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java
@@ -169,13 +169,18 @@ public class AsyncFileIO implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public MappedByteBuffer map(int maxWalSegmentSize) throws 
IOException {
+    @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
         throw new UnsupportedOperationException("AsynchronousFileChannel 
doesn't support mmap.");
     }
 
     /** {@inheritDoc} */
     @Override public void force() throws IOException {
-        ch.force(false);
+        force(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void force(boolean withMetadata) throws IOException {
+        ch.force(withMetadata);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
index 73e44b0..822bd66 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java
@@ -124,7 +124,16 @@ public interface FileIO extends AutoCloseable {
      */
     public void write(byte[] buf, int off, int len) throws IOException;
 
-    public MappedByteBuffer map(int maxWalSegmentSize) throws IOException;
+    /**
+     * Allocates memory mapped buffer for this file with given size.
+     *
+     * @param sizeBytes Size of buffer.
+     *
+     * @return Instance of mapped byte buffer.
+     *
+     * @throws IOException If some I/O error occurs.
+     */
+    public MappedByteBuffer map(int sizeBytes) throws IOException;
 
     /**
      * Forces any updates of this file to be written to the storage
@@ -135,6 +144,15 @@ public interface FileIO extends AutoCloseable {
     public void force() throws IOException;
 
     /**
+     * Forces any updates of this file to be written to the storage
+     * device that contains it.
+     *
+     * @param withMetadata If {@code true} force also file metadata.
+     * @throws IOException If some I/O error occurs.
+     */
+    public void force(boolean withMetadata) throws IOException;
+
+    /**
      * Returns current file size in bytes.
      *
      * @return File size.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
index dd563f2..683845b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java
@@ -77,8 +77,8 @@ public class FileIODecorator implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public MappedByteBuffer map(int maxWalSegmentSize) throws 
IOException {
-        return delegate.map(maxWalSegmentSize);
+    @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+        return delegate.map(sizeBytes);
     }
 
     /** {@inheritDoc} */
@@ -87,6 +87,11 @@ public class FileIODecorator implements FileIO {
     }
 
     /** {@inheritDoc} */
+    @Override public void force(boolean withMetadata) throws IOException {
+        delegate.force(withMetadata);
+    }
+
+    /** {@inheritDoc} */
     @Override public long size() throws IOException {
         return delegate.size();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
index 23d6ebf..8f7454d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java
@@ -84,8 +84,8 @@ public class RandomAccessFileIO implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public void force() throws IOException {
-        ch.force(false);
+    @Override public void force(boolean withMetadata) throws IOException {
+        ch.force(withMetadata);
     }
 
     /** {@inheritDoc} */
@@ -104,7 +104,12 @@ public class RandomAccessFileIO implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public MappedByteBuffer map(int maxWalSegmentSize) throws 
IOException {
-        return ch.map(FileChannel.MapMode.READ_WRITE, 0, maxWalSegmentSize);
+    @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
+        return ch.map(FileChannel.MapMode.READ_WRITE, 0, sizeBytes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void force() throws IOException {
+        force(false);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
index 83ff91b..469cf3e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java
@@ -116,6 +116,11 @@ public class UnzipFileIO implements FileIO {
 
     /** {@inheritDoc} */
     @Override public void force() throws IOException {
+        force(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void force(boolean withMetadata) throws IOException {
         throw new UnsupportedOperationException();
     }
 
@@ -130,7 +135,7 @@ public class UnzipFileIO implements FileIO {
     }
 
     /** {@inheritDoc} */
-    @Override public MappedByteBuffer map(int maxWalSegmentSize) throws 
IOException {
+    @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
         throw new UnsupportedOperationException();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
index 3e85c77..c902879 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsDiskErrorsRecoveringTest.java
@@ -25,6 +25,7 @@ import java.nio.file.OpenOption;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -39,15 +40,20 @@ import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.GridKernalState;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
 
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
 
 /**
@@ -61,19 +67,13 @@ public class IgnitePdsDiskErrorsRecoveringTest extends 
GridCommonAbstractTest {
     private static final int WAL_SEGMENT_SIZE = 1024 * PAGE_SIZE;
 
     /** */
-    private static final long DFLT_DISK_SPACE_BYTES = Long.MAX_VALUE;
-
-    /** */
     private static final long STOP_TIMEOUT_MS = 30 * 1000;
 
     /** */
     private static final String CACHE_NAME = "cache";
 
-    /** */
-    private boolean failPageStoreDiskOperations = false;
-
-    /** */
-    private long diskSpaceBytes = DFLT_DISK_SPACE_BYTES;
+    /** Specified i/o factory for particular test. */
+    private FileIOFactory ioFactory;
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
@@ -88,8 +88,7 @@ public class IgnitePdsDiskErrorsRecoveringTest extends 
GridCommonAbstractTest {
 
         cleanPersistenceDir();
 
-        failPageStoreDiskOperations = false;
-        diskSpaceBytes = DFLT_DISK_SPACE_BYTES;
+        ioFactory = null;
         System.clearProperty(IGNITE_WAL_MMAP);
     }
 
@@ -103,10 +102,11 @@ public class IgnitePdsDiskErrorsRecoveringTest extends 
GridCommonAbstractTest {
                 .setWalMode(WALMode.LOG_ONLY)
                 .setWalCompactionEnabled(false)
                 .setWalSegmentSize(WAL_SEGMENT_SIZE)
+                .setCheckpointFrequency(240 * 60 * 1000)
                 
.setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4);
 
-        if (failPageStoreDiskOperations)
-            dsCfg.setFileIOFactory(new LimitedSizeFileIOFactory(new 
RandomAccessFileIOFactory(), diskSpaceBytes));
+        if (ioFactory != null)
+            dsCfg.setFileIOFactory(ioFactory);
 
         cfg.setDataStorageConfiguration(dsCfg);
 
@@ -122,19 +122,17 @@ public class IgnitePdsDiskErrorsRecoveringTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     *
+     * Test node stopping & recovering on cache initialization fail.
      */
-    public void testRecoveringOnCacheInitError() throws Exception {
-        failPageStoreDiskOperations = true;
-
-        // Two pages is enough to initialize MetaStorage.
-        diskSpaceBytes = 2 * PAGE_SIZE;
+    public void testRecoveringOnCacheInitFail() throws Exception {
+        // Fail to initialize page store. 2 extra pages is needed for 
MetaStorage.
+        ioFactory = new FilteringFileIOFactory(".bin", new 
LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 2 * PAGE_SIZE));
 
         final IgniteEx grid = startGrid(0);
 
         boolean failed = false;
         try {
-            grid.active(true);
+            grid.cluster().active(true);
         } catch (Exception expected) {
             log.warning("Expected cache error", expected);
 
@@ -147,21 +145,128 @@ public class IgnitePdsDiskErrorsRecoveringTest extends 
GridCommonAbstractTest {
         awaitStop(grid);
 
         // Grid should be successfully recovered after stopping.
-        failPageStoreDiskOperations = false;
+        ioFactory = null;
 
         IgniteEx recoveredGrid = startGrid(0);
         recoveredGrid.active(true);
     }
 
     /**
+     * Test node stopping & recovering on start marker writing fail during 
activation.
      *
+     * @throws Exception If test failed.
      */
-    public void testRecoveringOnCheckpointWritingError() throws Exception {
-        failPageStoreDiskOperations = true;
-        diskSpaceBytes = 1024 * PAGE_SIZE;
+    public void testRecoveringOnNodeStartMarkerWriteFail() throws Exception {
+        // Fail to write node start marker tmp file at the second checkpoint. 
Pass only initial checkpoint.
+        ioFactory = new FilteringFileIOFactory("started.bin" + 
GridCacheDatabaseSharedManager.FILE_TMP_SUFFIX, new 
LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 20));
+
+        IgniteEx grid = startGrid(0);
+        grid.cluster().active(true);
+
+        for (int i = 0; i < 1000; i++) {
+            byte payload = (byte) i;
+            byte[] data = new byte[2048];
+            Arrays.fill(data, payload);
+
+            grid.cache(CACHE_NAME).put(i, data);
+        }
+
+        stopAllGrids();
+
+        boolean activationFailed = false;
+        try {
+            grid = startGrid(0);
+            grid.cluster().active(true);
+        }
+        catch (IgniteException e) {
+            log.warning("Activation test exception", e);
+
+            activationFailed = true;
+        }
+
+        Assert.assertTrue("Activation must be failed", activationFailed);
+
+        // Grid should be automatically stopped after checkpoint fail.
+        awaitStop(grid);
+
+        // Grid should be successfully recovered after stopping.
+        ioFactory = null;
+
+        IgniteEx recoveredGrid = startGrid(0);
+        recoveredGrid.cluster().active(true);
+
+        for (int i = 0; i < 1000; i++) {
+            byte payload = (byte) i;
+            byte[] data = new byte[2048];
+            Arrays.fill(data, payload);
+
+            byte[] actualData = (byte[]) 
recoveredGrid.cache(CACHE_NAME).get(i);
+            Assert.assertArrayEquals(data, actualData);
+        }
+    }
+
+
+    /**
+     * Test node stopping & recovering on checkpoint begin fail.
+     *
+     * @throws Exception If test failed.
+     */
+    public void testRecoveringOnCheckpointBeginFail() throws Exception {
+        // Fail to write checkpoint start marker tmp file at the second 
checkpoint. Pass only initial checkpoint.
+        ioFactory = new FilteringFileIOFactory("START.bin" + 
GridCacheDatabaseSharedManager.FILE_TMP_SUFFIX, new 
LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 20));
 
         final IgniteEx grid = startGrid(0);
-        grid.active(true);
+        grid.cluster().active(true);
+
+        for (int i = 0; i < 1000; i++) {
+            byte payload = (byte) i;
+            byte[] data = new byte[2048];
+            Arrays.fill(data, payload);
+
+            grid.cache(CACHE_NAME).put(i, data);
+        }
+
+        String errMsg = "Failed to write checkpoint entry";
+
+        boolean checkpointFailed = false;
+        try {
+            forceCheckpoint();
+        }
+        catch (IgniteCheckedException e) {
+            if (e.getMessage().contains(errMsg))
+                checkpointFailed = true;
+        }
+
+        Assert.assertTrue("Checkpoint must be failed by 
IgniteCheckedException: " + errMsg, checkpointFailed);
+
+        // Grid should be automatically stopped after checkpoint fail.
+        awaitStop(grid);
+
+        // Grid should be successfully recovered after stopping.
+        ioFactory = null;
+
+        IgniteEx recoveredGrid = startGrid(0);
+        recoveredGrid.cluster().active(true);
+
+        for (int i = 0; i < 1000; i++) {
+            byte payload = (byte) i;
+            byte[] data = new byte[2048];
+            Arrays.fill(data, payload);
+
+            byte[] actualData = (byte[]) 
recoveredGrid.cache(CACHE_NAME).get(i);
+            Assert.assertArrayEquals(data, actualData);
+        }
+    }
+
+    /**
+     * Test node stopping & recovering on checkpoint pages write fail.
+     */
+    public void testRecoveringOnCheckpointWriteFail() throws Exception {
+        // Fail write partition and index files at the second checkpoint. Pass 
only initial checkpoint.
+        ioFactory = new FilteringFileIOFactory(".bin", new 
LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), 128 * PAGE_SIZE));
+
+        final IgniteEx grid = startGrid(0);
+        grid.cluster().active(true);
 
         for (int i = 0; i < 1000; i++) {
             byte payload = (byte) i;
@@ -187,10 +292,10 @@ public class IgnitePdsDiskErrorsRecoveringTest extends 
GridCommonAbstractTest {
         awaitStop(grid);
 
         // Grid should be successfully recovered after stopping.
-        failPageStoreDiskOperations = false;
+        ioFactory = null;
 
         IgniteEx recoveredGrid = startGrid(0);
-        recoveredGrid.active(true);
+        recoveredGrid.cluster().active(true);
 
         for (int i = 0; i < 1000; i++) {
             byte payload = (byte) i;
@@ -203,33 +308,35 @@ public class IgnitePdsDiskErrorsRecoveringTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     *
+     * Test node stopping & recovering on WAL writing fail with enabled MMAP 
(Batch allocation for WAL segments).
      */
-    public void testRecoveringOnWALErrorWithMmap() throws Exception {
-        diskSpaceBytes = WAL_SEGMENT_SIZE;
+    public void testRecoveringOnWALWritingFail1() throws Exception {
+        // Allow to allocate only 1 wal segment, fail on write to second.
+        ioFactory = new FilteringFileIOFactory(".wal", new 
LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), WAL_SEGMENT_SIZE));
         System.setProperty(IGNITE_WAL_MMAP, "true");
-        emulateRecoveringOnWALWritingError();
+        doTestRecoveringOnWALWritingFail();
     }
 
     /**
-     *
+     * Test node stopping & recovering on WAL writing fail with disabled MMAP.
      */
-    public void testRecoveringOnWALErrorWithoutMmap() throws Exception {
-        diskSpaceBytes = 2 * WAL_SEGMENT_SIZE;
+    public void testRecoveringOnWALWritingFail2() throws Exception {
+        // Fail somewhere on the second wal segment.
+        ioFactory = new FilteringFileIOFactory(".wal", new 
LimitedSizeFileIOFactory(new RandomAccessFileIOFactory(), (long) (1.5 * 
WAL_SEGMENT_SIZE)));
         System.setProperty(IGNITE_WAL_MMAP, "false");
-        emulateRecoveringOnWALWritingError();
+        doTestRecoveringOnWALWritingFail();
     }
 
     /**
-     *
+     * Test node stopping & recovery on WAL writing fail.
      */
-    private void emulateRecoveringOnWALWritingError() throws Exception {
+    private void doTestRecoveringOnWALWritingFail() throws Exception {
         final IgniteEx grid = startGrid(0);
 
         FileWriteAheadLogManager wal = 
(FileWriteAheadLogManager)grid.context().cache().context().wal();
-        wal.setFileIOFactory(new LimitedSizeFileIOFactory(new 
RandomAccessFileIOFactory(), diskSpaceBytes));
+        wal.setFileIOFactory(ioFactory);
 
-        grid.active(true);
+        grid.cluster().active(true);
 
         int failedPosition = -1;
 
@@ -254,9 +361,11 @@ public class IgnitePdsDiskErrorsRecoveringTest extends 
GridCommonAbstractTest {
         // Grid should be automatically stopped after WAL fail.
         awaitStop(grid);
 
+        ioFactory = null;
+
         // Grid should be successfully recovered after stopping.
         IgniteEx recoveredGrid = startGrid(0);
-        recoveredGrid.active(true);
+        recoveredGrid.cluster().active(true);
 
         for (int i = 0; i < failedPosition; i++) {
             byte payload = (byte) i;
@@ -328,11 +437,49 @@ public class IgnitePdsDiskErrorsRecoveringTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public MappedByteBuffer map(int maxWalSegmentSize) throws 
IOException {
-            availableSpaceBytes.addAndGet(-maxWalSegmentSize);
+        @Override public MappedByteBuffer map(int sizeBytes) throws 
IOException {
+            availableSpaceBytes.addAndGet(-sizeBytes);
             if (availableSpaceBytes.get() < 0)
                 throw new IOException("Not enough space!");
-            return super.map(maxWalSegmentSize);
+            return super.map(sizeBytes);
+        }
+    }
+
+    /**
+     * Factory to provide custom File I/O interfaces only for files with 
specified suffix.
+     * For other files {@link RandomAccessFileIO} will be used.
+     */
+    private static class FilteringFileIOFactory implements FileIOFactory {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Delegate. */
+        private final FileIOFactory delegate;
+
+        /** File suffix pattern. */
+        private final String pattern;
+
+        /**
+         * Constructor.
+         *
+         * @param pattern File suffix pattern.
+         * @param delegate I/O Factory delegate.
+         */
+        FilteringFileIOFactory(String pattern, FileIOFactory delegate) {
+            this.delegate = delegate;
+            this.pattern = pattern;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file) throws IOException {
+            return create(file, CREATE, WRITE, READ);
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws 
IOException {
+            if (file.getName().endsWith(pattern))
+                return delegate.create(file, modes);
+            return new RandomAccessFileIO(file, modes);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
index 946b4e8..042a447 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushFailoverTest.java
@@ -206,8 +206,8 @@ public class IgniteWalFlushFailoverTest extends 
GridCommonAbstractTest {
                 }
 
                 /** {@inheritDoc} */
-                @Override public MappedByteBuffer map(int maxWalSegmentSize) 
throws IOException {
-                    return delegate.map(maxWalSegmentSize);
+                @Override public MappedByteBuffer map(int sizeBytes) throws 
IOException {
+                    return delegate.map(sizeBytes);
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
index 1259c3c..fe16328 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFlushMultiNodeFailoverAbstractSelfTest.java
@@ -250,8 +250,8 @@ public abstract class 
IgniteWalFlushMultiNodeFailoverAbstractSelfTest extends Gr
                 }
 
                 /** {@inheritDoc} */
-                @Override public MappedByteBuffer map(int maxWalSegmentSize) 
throws IOException {
-                    return delegate.map(maxWalSegmentSize);
+                @Override public MappedByteBuffer map(int sizeBytes) throws 
IOException {
+                    return delegate.map(sizeBytes);
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
index 9f1342f..249718b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottleSmokeTest.java
@@ -319,8 +319,8 @@ public class PagesWriteThrottleSmokeTest extends 
GridCommonAbstractTest {
                 }
 
                 /** {@inheritDoc} */
-                @Override public MappedByteBuffer map(int maxWalSegmentSize) 
throws IOException {
-                    return delegate.map(maxWalSegmentSize);
+                @Override public MappedByteBuffer map(int sizeBytes) throws 
IOException {
+                    return delegate.map(sizeBytes);
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4a0695ce/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
----------------------------------------------------------------------
diff --git 
a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
 
b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
index 3cb4886..681426c 100644
--- 
a/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
+++ 
b/modules/direct-io/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AlignedBuffersDirectFileIO.java
@@ -460,12 +460,17 @@ public class AlignedBuffersDirectFileIO implements FileIO 
{
     }
 
     /** {@inheritDoc} */
-    @Override public MappedByteBuffer map(int maxWalSegmentSize) throws 
IOException {
+    @Override public MappedByteBuffer map(int sizeBytes) throws IOException {
         throw new UnsupportedOperationException("AsynchronousFileChannel 
doesn't support mmap.");
     }
 
     /** {@inheritDoc} */
     @Override public void force() throws IOException {
+        force(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void force(boolean withMetadata) throws IOException {
         if (IgniteNativeIoLib.fsync(fdCheckOpened()) < 0)
             throw new IOException(String.format("Error fsync()'ing %s, got 
%s", file, getLastError()));
     }

Reply via email to