Mmuzaf commented on a change in pull request #7984:
URL: https://github.com/apache/ignite/pull/7984#discussion_r525054661



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
+
+/**
+ * Everything related to file management during defragmentation process.
+ */
+public class DefragmentationFileUtils {
+    /** Prefix for link mapping files. */
+    private static final String DFRG_LINK_MAPPING_FILE_PREFIX = 
PART_FILE_PREFIX + "map-";
+
+    /** Link mapping file template. */
+    private static final String DFRG_LINK_MAPPING_FILE_TEMPLATE = 
DFRG_LINK_MAPPING_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmentation complation marker file name. */
+    private static final String DFRG_COMPLETION_MARKER_FILE_NAME = 
"dfrg-completion-marker";
+
+    /** Name of defragmentated index partition file. */
+    private static final String DFRG_INDEX_FILE_NAME = INDEX_FILE_PREFIX + 
"-dfrg" + FILE_SUFFIX;
+
+    /** Name of defragmentated index partition temporary file. */
+    private static final String DFRG_INDEX_TMP_FILE_NAME = 
DFRG_INDEX_FILE_NAME + TMP_SUFFIX;
+
+    /** Prefix for defragmented partition files. */
+    private static final String DFRG_PARTITION_FILE_PREFIX = PART_FILE_PREFIX 
+ "dfrg-";
+
+    /** Defragmented partition file template. */
+    private static final String DFRG_PARTITION_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmented partition temp file template. */
+    private static final String DFRG_PARTITION_TMP_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_TEMPLATE + TMP_SUFFIX;
+
+    /**
+     * Performs cleanup of work dir before initializing file page stores.
+     * Will finish batch renaming if defragmentation was completed or delete 
garbage if it wasn't.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     */
+    public static void beforeInitPageStores(File workDir, IgniteLogger log) 
throws IgniteCheckedException {

Review comment:
       This method used only once, I suggest to inline it.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
+
+/**
+ * Everything related to file management during defragmentation process.
+ */
+public class DefragmentationFileUtils {
+    /** Prefix for link mapping files. */
+    private static final String DFRG_LINK_MAPPING_FILE_PREFIX = 
PART_FILE_PREFIX + "map-";
+
+    /** Link mapping file template. */
+    private static final String DFRG_LINK_MAPPING_FILE_TEMPLATE = 
DFRG_LINK_MAPPING_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmentation complation marker file name. */
+    private static final String DFRG_COMPLETION_MARKER_FILE_NAME = 
"dfrg-completion-marker";
+
+    /** Name of defragmentated index partition file. */
+    private static final String DFRG_INDEX_FILE_NAME = INDEX_FILE_PREFIX + 
"-dfrg" + FILE_SUFFIX;
+
+    /** Name of defragmentated index partition temporary file. */
+    private static final String DFRG_INDEX_TMP_FILE_NAME = 
DFRG_INDEX_FILE_NAME + TMP_SUFFIX;
+
+    /** Prefix for defragmented partition files. */
+    private static final String DFRG_PARTITION_FILE_PREFIX = PART_FILE_PREFIX 
+ "dfrg-";
+
+    /** Defragmented partition file template. */
+    private static final String DFRG_PARTITION_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmented partition temp file template. */
+    private static final String DFRG_PARTITION_TMP_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_TEMPLATE + TMP_SUFFIX;
+
+    /**
+     * Performs cleanup of work dir before initializing file page stores.
+     * Will finish batch renaming if defragmentation was completed or delete 
garbage if it wasn't.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     */
+    public static void beforeInitPageStores(File workDir, IgniteLogger log) 
throws IgniteCheckedException {
+        batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+        U.delete(defragmentationCompletionMarkerFile(workDir));
+
+        for (File file : workDir.listFiles()) {
+            String fileName = file.getName();
+
+            if (
+                fileName.startsWith(DFRG_PARTITION_FILE_PREFIX)
+                || fileName.startsWith(DFRG_INDEX_FILE_NAME)
+                || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)
+            )
+                U.delete(file);
+        }
+    }
+
+    /**
+     * Checks whether cache group defragmentation completed or not. Completes 
it if all that's left is renaming.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param log Logger to write messages.
+     * @return {@code true} if given cache group is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentationCompletionMarkerFile(File)
+     */
+    public static boolean skipAlreadyDefragmentedCacheGroup(File workDir, int 
grpId, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (completionMarkerFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented page group",
+                    "grpId", grpId, false,
+                    "markerFileName", completionMarkerFile.getName(), false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks whether partition has already been defragmented or not. Cleans 
corrupted data if previous failed
+     * defragmentation attempt was found.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param partId Partionion index to check.
+     * @param log Logger to write messages.
+     * @return {@code true} if given partition is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartMappingFile(File, int)
+     */
+    public static boolean skipAlreadyDefragmentedPartition(File workDir, int 
grpId, int partId, IgniteLogger log) throws IgniteCheckedException {
+        File defragmentedPartFile = defragmentedPartFile(workDir, partId);
+        File defragmentedPartMappingFile = 
defragmentedPartMappingFile(workDir, partId);
+
+        if (defragmentedPartFile.exists() && 
defragmentedPartMappingFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented partition",
+                    "grpId", grpId, false,
+                    "partId", partId, false,
+                    "partFileName", defragmentedPartFile.getName(), false,
+                    "mappingFileName", defragmentedPartMappingFile.getName(), 
false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            return true;
+        }
+
+        File defragmentedPartTmpFile = defragmentedPartTmpFile(workDir, 
partId);
+
+        try {
+            Files.deleteIfExists(defragmentedPartTmpFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartMappingFile.toPath());
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+
+        return false;
+    }
+
+    /**
+     * Failure-tolerant batch rename of defragmented partition files.
+     *
+     * Deletes all link mapping files old partition and index files, renaming 
defragmentated files in the process. Can
+     * be run on the same folder multiple times if failed for some reason.
+     *
+     * Does something only if completion marker is present in the folder. This 
marker won't be deleted in the end.
+     * Deletion of the marker must be done outside of defragmentation mode to 
prevent cache groups to be defragmentated
+     * several times in case of failures.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see 
DefragmentationFileUtils#writeDefragmentationCompletionMarker(FileIOFactory, 
File, IgniteLogger)
+     */
+    public static void batchRenameDefragmentedCacheGroupPartitions(File 
workDir, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (!completionMarkerFile.exists())
+            return;
+
+        try {
+            for (File mappingFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)))
+                Files.delete(mappingFile.toPath());
+
+            for (File partFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_PARTITION_FILE_PREFIX))) {
+                int partId = extractPartId(partFile.getName());
+
+                File oldPartFile = new File(workDir, 
String.format(PART_FILE_TEMPLATE, partId));
+
+                Files.move(partFile.toPath(), oldPartFile.toPath(), 
ATOMIC_MOVE, REPLACE_EXISTING);
+            }
+
+            File idxFile = new File(workDir, DFRG_INDEX_FILE_NAME);
+
+            if (idxFile.exists()) {
+                File oldIdxFile = new File(workDir, INDEX_FILE_NAME);
+
+                Files.move(idxFile.toPath(), oldIdxFile.toPath(), ATOMIC_MOVE, 
REPLACE_EXISTING);
+            }
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /**
+     * Extracts partition number from file names like {@code part-dfrg-%d.bin}.
+     *
+     * @param dfrgPartFileName Defragmented partition file name.
+     * @return Partition index.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     */
+    private static int extractPartId(String dfrgPartFileName) {

Review comment:
       This method used only once, I suggest to inline it.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
+
+/**
+ * Everything related to file management during defragmentation process.
+ */
+public class DefragmentationFileUtils {
+    /** Prefix for link mapping files. */
+    private static final String DFRG_LINK_MAPPING_FILE_PREFIX = 
PART_FILE_PREFIX + "map-";
+
+    /** Link mapping file template. */
+    private static final String DFRG_LINK_MAPPING_FILE_TEMPLATE = 
DFRG_LINK_MAPPING_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmentation complation marker file name. */
+    private static final String DFRG_COMPLETION_MARKER_FILE_NAME = 
"dfrg-completion-marker";
+
+    /** Name of defragmentated index partition file. */
+    private static final String DFRG_INDEX_FILE_NAME = INDEX_FILE_PREFIX + 
"-dfrg" + FILE_SUFFIX;
+
+    /** Name of defragmentated index partition temporary file. */
+    private static final String DFRG_INDEX_TMP_FILE_NAME = 
DFRG_INDEX_FILE_NAME + TMP_SUFFIX;
+
+    /** Prefix for defragmented partition files. */
+    private static final String DFRG_PARTITION_FILE_PREFIX = PART_FILE_PREFIX 
+ "dfrg-";
+
+    /** Defragmented partition file template. */
+    private static final String DFRG_PARTITION_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmented partition temp file template. */
+    private static final String DFRG_PARTITION_TMP_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_TEMPLATE + TMP_SUFFIX;
+
+    /**
+     * Performs cleanup of work dir before initializing file page stores.
+     * Will finish batch renaming if defragmentation was completed or delete 
garbage if it wasn't.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     */
+    public static void beforeInitPageStores(File workDir, IgniteLogger log) 
throws IgniteCheckedException {
+        batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+        U.delete(defragmentationCompletionMarkerFile(workDir));
+
+        for (File file : workDir.listFiles()) {
+            String fileName = file.getName();
+
+            if (
+                fileName.startsWith(DFRG_PARTITION_FILE_PREFIX)
+                || fileName.startsWith(DFRG_INDEX_FILE_NAME)
+                || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)
+            )
+                U.delete(file);
+        }
+    }
+
+    /**
+     * Checks whether cache group defragmentation completed or not. Completes 
it if all that's left is renaming.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param log Logger to write messages.
+     * @return {@code true} if given cache group is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentationCompletionMarkerFile(File)
+     */
+    public static boolean skipAlreadyDefragmentedCacheGroup(File workDir, int 
grpId, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (completionMarkerFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented page group",
+                    "grpId", grpId, false,
+                    "markerFileName", completionMarkerFile.getName(), false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks whether partition has already been defragmented or not. Cleans 
corrupted data if previous failed
+     * defragmentation attempt was found.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param partId Partionion index to check.
+     * @param log Logger to write messages.
+     * @return {@code true} if given partition is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartMappingFile(File, int)
+     */
+    public static boolean skipAlreadyDefragmentedPartition(File workDir, int 
grpId, int partId, IgniteLogger log) throws IgniteCheckedException {
+        File defragmentedPartFile = defragmentedPartFile(workDir, partId);
+        File defragmentedPartMappingFile = 
defragmentedPartMappingFile(workDir, partId);
+
+        if (defragmentedPartFile.exists() && 
defragmentedPartMappingFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented partition",
+                    "grpId", grpId, false,
+                    "partId", partId, false,
+                    "partFileName", defragmentedPartFile.getName(), false,
+                    "mappingFileName", defragmentedPartMappingFile.getName(), 
false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            return true;
+        }
+
+        File defragmentedPartTmpFile = defragmentedPartTmpFile(workDir, 
partId);
+
+        try {
+            Files.deleteIfExists(defragmentedPartTmpFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartMappingFile.toPath());
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+
+        return false;
+    }
+
+    /**
+     * Failure-tolerant batch rename of defragmented partition files.
+     *
+     * Deletes all link mapping files old partition and index files, renaming 
defragmentated files in the process. Can
+     * be run on the same folder multiple times if failed for some reason.
+     *
+     * Does something only if completion marker is present in the folder. This 
marker won't be deleted in the end.
+     * Deletion of the marker must be done outside of defragmentation mode to 
prevent cache groups to be defragmentated
+     * several times in case of failures.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see 
DefragmentationFileUtils#writeDefragmentationCompletionMarker(FileIOFactory, 
File, IgniteLogger)
+     */
+    public static void batchRenameDefragmentedCacheGroupPartitions(File 
workDir, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (!completionMarkerFile.exists())
+            return;
+
+        try {
+            for (File mappingFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)))
+                Files.delete(mappingFile.toPath());
+
+            for (File partFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_PARTITION_FILE_PREFIX))) {
+                int partId = extractPartId(partFile.getName());
+
+                File oldPartFile = new File(workDir, 
String.format(PART_FILE_TEMPLATE, partId));
+
+                Files.move(partFile.toPath(), oldPartFile.toPath(), 
ATOMIC_MOVE, REPLACE_EXISTING);
+            }
+
+            File idxFile = new File(workDir, DFRG_INDEX_FILE_NAME);
+
+            if (idxFile.exists()) {
+                File oldIdxFile = new File(workDir, INDEX_FILE_NAME);
+
+                Files.move(idxFile.toPath(), oldIdxFile.toPath(), ATOMIC_MOVE, 
REPLACE_EXISTING);
+            }
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /**
+     * Extracts partition number from file names like {@code part-dfrg-%d.bin}.
+     *
+     * @param dfrgPartFileName Defragmented partition file name.
+     * @return Partition index.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     */
+    private static int extractPartId(String dfrgPartFileName) {
+        assert dfrgPartFileName.startsWith(DFRG_PARTITION_FILE_PREFIX) : 
dfrgPartFileName;
+        assert dfrgPartFileName.endsWith(FILE_SUFFIX) : dfrgPartFileName;
+
+        String partIdStr = dfrgPartFileName.substring(
+            DFRG_PARTITION_FILE_PREFIX.length(),
+            dfrgPartFileName.length() - FILE_SUFFIX.length()
+        );
+
+        return Integer.parseInt(partIdStr);
+    }
+
+    /**
+     * Return file named {@code index-dfrg.bin.tmp} in given folder. It will 
be used for storing defragmented index
+     * partition during the process.
+     *
+     * @param workDir Cache group working directory.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexFile(File)
+     */
+    public static File defragmentedIndexTmpFile(File workDir) {
+        return new File(workDir, DFRG_INDEX_TMP_FILE_NAME);
+    }
+
+    /**
+     * Return file named {@code index-dfrg.bin} in given folder. It will be 
used for storing defragmented index
+     * partition when the process is over.
+     *
+     * @param workDir Cache group working directory.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexTmpFile(File)
+     */
+    public static File defragmentedIndexFile(File workDir) {
+        return new File(workDir, DFRG_INDEX_FILE_NAME);
+    }
+
+    /**
+     * Rename temporary index defragmenation file to a finalized one.
+     *
+     * @param workDir Cache group working directory.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexTmpFile(File)
+     * @see DefragmentationFileUtils#defragmentedIndexFile(File)
+     */
+    public static void renameTempIndexFile(File workDir) throws 
IgniteCheckedException {
+        File defragmentedIdxTmpFile = defragmentedIndexTmpFile(workDir);
+        File defragmentedIdxFile = defragmentedIndexFile(workDir);
+
+        try {
+            Files.move(defragmentedIdxTmpFile.toPath(), 
defragmentedIdxFile.toPath(), ATOMIC_MOVE);
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /**
+     * Return file named {@code part-dfrg-%d.bin.tmp} in given folder. It will 
be used for storing defragmented data
+     * partition during the process.
+     *
+     * @param workDir Cache group working directory.
+     * @param partId Parition index, will be substituted into file name.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     */
+    public static File defragmentedPartTmpFile(File workDir, int partId) {
+        return new File(workDir, 
String.format(DFRG_PARTITION_TMP_FILE_TEMPLATE, partId));
+    }
+
+    /**
+     * Return file named {@code part-dfrg-%d.bin} in given folder. It will be 
used for storing defragmented data
+     * partition when the process is over.
+     *
+     * @param workDir Cache group working directory.
+     * @param partId Parition index, will be substituted into file name.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+     */
+    public static File defragmentedPartFile(File workDir, int partId) {
+        return new File(workDir, String.format(DFRG_PARTITION_FILE_TEMPLATE, 
partId));
+    }
+
+    /**
+     * Rename temporary partition defragmenation file to a finalized one.

Review comment:
       defragmenation  -> defragmentation

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
+
+/**
+ * Everything related to file management during defragmentation process.
+ */
+public class DefragmentationFileUtils {
+    /** Prefix for link mapping files. */
+    private static final String DFRG_LINK_MAPPING_FILE_PREFIX = 
PART_FILE_PREFIX + "map-";
+
+    /** Link mapping file template. */
+    private static final String DFRG_LINK_MAPPING_FILE_TEMPLATE = 
DFRG_LINK_MAPPING_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmentation complation marker file name. */
+    private static final String DFRG_COMPLETION_MARKER_FILE_NAME = 
"dfrg-completion-marker";
+
+    /** Name of defragmentated index partition file. */
+    private static final String DFRG_INDEX_FILE_NAME = INDEX_FILE_PREFIX + 
"-dfrg" + FILE_SUFFIX;
+
+    /** Name of defragmentated index partition temporary file. */
+    private static final String DFRG_INDEX_TMP_FILE_NAME = 
DFRG_INDEX_FILE_NAME + TMP_SUFFIX;
+
+    /** Prefix for defragmented partition files. */
+    private static final String DFRG_PARTITION_FILE_PREFIX = PART_FILE_PREFIX 
+ "dfrg-";
+
+    /** Defragmented partition file template. */
+    private static final String DFRG_PARTITION_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmented partition temp file template. */
+    private static final String DFRG_PARTITION_TMP_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_TEMPLATE + TMP_SUFFIX;
+
+    /**
+     * Performs cleanup of work dir before initializing file page stores.
+     * Will finish batch renaming if defragmentation was completed or delete 
garbage if it wasn't.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     */
+    public static void beforeInitPageStores(File workDir, IgniteLogger log) 
throws IgniteCheckedException {
+        batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+        U.delete(defragmentationCompletionMarkerFile(workDir));
+
+        for (File file : workDir.listFiles()) {
+            String fileName = file.getName();
+
+            if (
+                fileName.startsWith(DFRG_PARTITION_FILE_PREFIX)
+                || fileName.startsWith(DFRG_INDEX_FILE_NAME)
+                || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)
+            )
+                U.delete(file);
+        }
+    }
+
+    /**
+     * Checks whether cache group defragmentation completed or not. Completes 
it if all that's left is renaming.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param log Logger to write messages.
+     * @return {@code true} if given cache group is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentationCompletionMarkerFile(File)
+     */
+    public static boolean skipAlreadyDefragmentedCacheGroup(File workDir, int 
grpId, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (completionMarkerFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented page group",
+                    "grpId", grpId, false,
+                    "markerFileName", completionMarkerFile.getName(), false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks whether partition has already been defragmented or not. Cleans 
corrupted data if previous failed
+     * defragmentation attempt was found.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param partId Partionion index to check.
+     * @param log Logger to write messages.
+     * @return {@code true} if given partition is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartMappingFile(File, int)
+     */
+    public static boolean skipAlreadyDefragmentedPartition(File workDir, int 
grpId, int partId, IgniteLogger log) throws IgniteCheckedException {
+        File defragmentedPartFile = defragmentedPartFile(workDir, partId);
+        File defragmentedPartMappingFile = 
defragmentedPartMappingFile(workDir, partId);
+
+        if (defragmentedPartFile.exists() && 
defragmentedPartMappingFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented partition",
+                    "grpId", grpId, false,
+                    "partId", partId, false,
+                    "partFileName", defragmentedPartFile.getName(), false,
+                    "mappingFileName", defragmentedPartMappingFile.getName(), 
false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            return true;
+        }
+
+        File defragmentedPartTmpFile = defragmentedPartTmpFile(workDir, 
partId);
+
+        try {
+            Files.deleteIfExists(defragmentedPartTmpFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartMappingFile.toPath());
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+
+        return false;
+    }
+
+    /**
+     * Failure-tolerant batch rename of defragmented partition files.
+     *
+     * Deletes all link mapping files old partition and index files, renaming 
defragmentated files in the process. Can
+     * be run on the same folder multiple times if failed for some reason.
+     *
+     * Does something only if completion marker is present in the folder. This 
marker won't be deleted in the end.
+     * Deletion of the marker must be done outside of defragmentation mode to 
prevent cache groups to be defragmentated
+     * several times in case of failures.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see 
DefragmentationFileUtils#writeDefragmentationCompletionMarker(FileIOFactory, 
File, IgniteLogger)
+     */
+    public static void batchRenameDefragmentedCacheGroupPartitions(File 
workDir, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (!completionMarkerFile.exists())
+            return;
+
+        try {
+            for (File mappingFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)))
+                Files.delete(mappingFile.toPath());
+
+            for (File partFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_PARTITION_FILE_PREFIX))) {
+                int partId = extractPartId(partFile.getName());
+
+                File oldPartFile = new File(workDir, 
String.format(PART_FILE_TEMPLATE, partId));
+
+                Files.move(partFile.toPath(), oldPartFile.toPath(), 
ATOMIC_MOVE, REPLACE_EXISTING);
+            }
+
+            File idxFile = new File(workDir, DFRG_INDEX_FILE_NAME);
+
+            if (idxFile.exists()) {
+                File oldIdxFile = new File(workDir, INDEX_FILE_NAME);
+
+                Files.move(idxFile.toPath(), oldIdxFile.toPath(), ATOMIC_MOVE, 
REPLACE_EXISTING);
+            }
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /**
+     * Extracts partition number from file names like {@code part-dfrg-%d.bin}.
+     *
+     * @param dfrgPartFileName Defragmented partition file name.
+     * @return Partition index.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     */
+    private static int extractPartId(String dfrgPartFileName) {
+        assert dfrgPartFileName.startsWith(DFRG_PARTITION_FILE_PREFIX) : 
dfrgPartFileName;
+        assert dfrgPartFileName.endsWith(FILE_SUFFIX) : dfrgPartFileName;
+
+        String partIdStr = dfrgPartFileName.substring(
+            DFRG_PARTITION_FILE_PREFIX.length(),
+            dfrgPartFileName.length() - FILE_SUFFIX.length()
+        );
+
+        return Integer.parseInt(partIdStr);
+    }
+
+    /**
+     * Return file named {@code index-dfrg.bin.tmp} in given folder. It will 
be used for storing defragmented index
+     * partition during the process.
+     *
+     * @param workDir Cache group working directory.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexFile(File)
+     */
+    public static File defragmentedIndexTmpFile(File workDir) {
+        return new File(workDir, DFRG_INDEX_TMP_FILE_NAME);
+    }
+
+    /**
+     * Return file named {@code index-dfrg.bin} in given folder. It will be 
used for storing defragmented index
+     * partition when the process is over.
+     *
+     * @param workDir Cache group working directory.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexTmpFile(File)
+     */
+    public static File defragmentedIndexFile(File workDir) {
+        return new File(workDir, DFRG_INDEX_FILE_NAME);
+    }
+
+    /**
+     * Rename temporary index defragmenation file to a finalized one.
+     *
+     * @param workDir Cache group working directory.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexTmpFile(File)
+     * @see DefragmentationFileUtils#defragmentedIndexFile(File)
+     */
+    public static void renameTempIndexFile(File workDir) throws 
IgniteCheckedException {
+        File defragmentedIdxTmpFile = defragmentedIndexTmpFile(workDir);
+        File defragmentedIdxFile = defragmentedIndexFile(workDir);
+
+        try {
+            Files.move(defragmentedIdxTmpFile.toPath(), 
defragmentedIdxFile.toPath(), ATOMIC_MOVE);
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /**
+     * Return file named {@code part-dfrg-%d.bin.tmp} in given folder. It will 
be used for storing defragmented data
+     * partition during the process.
+     *
+     * @param workDir Cache group working directory.
+     * @param partId Parition index, will be substituted into file name.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     */
+    public static File defragmentedPartTmpFile(File workDir, int partId) {
+        return new File(workDir, 
String.format(DFRG_PARTITION_TMP_FILE_TEMPLATE, partId));
+    }
+
+    /**
+     * Return file named {@code part-dfrg-%d.bin} in given folder. It will be 
used for storing defragmented data
+     * partition when the process is over.
+     *
+     * @param workDir Cache group working directory.
+     * @param partId Parition index, will be substituted into file name.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+     */
+    public static File defragmentedPartFile(File workDir, int partId) {
+        return new File(workDir, String.format(DFRG_PARTITION_FILE_TEMPLATE, 
partId));
+    }
+
+    /**
+     * Rename temporary partition defragmenation file to a finalized one.
+     *
+     * @param workDir Cache group working directory.
+     * @param partId Parition index.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     */
+    public static void renameTempPartitionFile(File workDir, int partId) 
throws IgniteCheckedException {
+        File defragmentedPartTmpFile = defragmentedPartTmpFile(workDir, 
partId);
+        File defragmentedPartFile = defragmentedPartFile(workDir, partId);
+
+        assert !defragmentedPartFile.exists() : defragmentedPartFile;
+
+        try {
+            Files.move(defragmentedPartTmpFile.toPath(), 
defragmentedPartFile.toPath(), ATOMIC_MOVE);
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /**
+     * Return file named {@code part-map-%d.bin} in given folder. It will be 
used for storing defragmention links
+     * mapping for given partition during and after defragmentation process. 
No temporary counterpart is required here.
+     *
+     * @param workDir Cache group working directory.
+     * @param partId Parition index, will be substituted into file name.
+     * @return File.
+     *
+     * @see LinkMap
+     */
+    public static File defragmentedPartMappingFile(File workDir, int partId) {
+        return new File(workDir, 
String.format(DFRG_LINK_MAPPING_FILE_TEMPLATE, partId));
+    }
+
+    /**
+     * Return defragmentation completion marker file. This file can only be 
created when all partitions and index are
+     * defragmented and renamed from their original {@code *.tmp} versions. 
Presence of this file signals that no data
+     * will be lost if original partitions are deleted and batch rename 
process can be safely initiated.
+     *
+     * @param workDir Cache group working directory.
+     * @return File.
+     *
+     * @see 
DefragmentationFileUtils#writeDefragmentationCompletionMarker(FileIOFactory, 
File, IgniteLogger)
+     * @see 
DefragmentationFileUtils#batchRenameDefragmentedCacheGroupPartitions(File, 
IgniteLogger)
+     */
+    public static File defragmentationCompletionMarkerFile(File workDir) {
+        return new File(workDir, DFRG_COMPLETION_MARKER_FILE_NAME);
+    }
+
+    /**
+     * Creates empty completion marker file in given directory.
+     *
+     * @param ioFactory File IO factory.
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentationCompletionMarkerFile(File)
+     */
+    public static void writeDefragmentationCompletionMarker(
+        FileIOFactory ioFactory,
+        File workDir,
+        IgniteLogger log
+    ) throws IgniteCheckedException {
+        File completionMarker = defragmentationCompletionMarkerFile(workDir);
+
+        try (FileIO io = ioFactory.create(completionMarker, CREATE_NEW, 
WRITE)) {
+            io.force(true);
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /** */
+    private static void handleIoException(IOException e) throws 
IgniteCheckedException {

Review comment:
       Why do we need this method at all?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
+
+/**
+ * Everything related to file management during defragmentation process.
+ */
+public class DefragmentationFileUtils {
+    /** Prefix for link mapping files. */
+    private static final String DFRG_LINK_MAPPING_FILE_PREFIX = 
PART_FILE_PREFIX + "map-";
+
+    /** Link mapping file template. */
+    private static final String DFRG_LINK_MAPPING_FILE_TEMPLATE = 
DFRG_LINK_MAPPING_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmentation complation marker file name. */
+    private static final String DFRG_COMPLETION_MARKER_FILE_NAME = 
"dfrg-completion-marker";
+
+    /** Name of defragmentated index partition file. */
+    private static final String DFRG_INDEX_FILE_NAME = INDEX_FILE_PREFIX + 
"-dfrg" + FILE_SUFFIX;
+
+    /** Name of defragmentated index partition temporary file. */
+    private static final String DFRG_INDEX_TMP_FILE_NAME = 
DFRG_INDEX_FILE_NAME + TMP_SUFFIX;
+
+    /** Prefix for defragmented partition files. */
+    private static final String DFRG_PARTITION_FILE_PREFIX = PART_FILE_PREFIX 
+ "dfrg-";
+
+    /** Defragmented partition file template. */
+    private static final String DFRG_PARTITION_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmented partition temp file template. */
+    private static final String DFRG_PARTITION_TMP_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_TEMPLATE + TMP_SUFFIX;
+
+    /**
+     * Performs cleanup of work dir before initializing file page stores.
+     * Will finish batch renaming if defragmentation was completed or delete 
garbage if it wasn't.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     */
+    public static void beforeInitPageStores(File workDir, IgniteLogger log) 
throws IgniteCheckedException {
+        batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+        U.delete(defragmentationCompletionMarkerFile(workDir));
+
+        for (File file : workDir.listFiles()) {
+            String fileName = file.getName();
+
+            if (
+                fileName.startsWith(DFRG_PARTITION_FILE_PREFIX)
+                || fileName.startsWith(DFRG_INDEX_FILE_NAME)
+                || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)
+            )
+                U.delete(file);
+        }
+    }
+
+    /**
+     * Checks whether cache group defragmentation completed or not. Completes 
it if all that's left is renaming.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param log Logger to write messages.
+     * @return {@code true} if given cache group is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentationCompletionMarkerFile(File)
+     */
+    public static boolean skipAlreadyDefragmentedCacheGroup(File workDir, int 
grpId, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (completionMarkerFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented page group",
+                    "grpId", grpId, false,
+                    "markerFileName", completionMarkerFile.getName(), false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks whether partition has already been defragmented or not. Cleans 
corrupted data if previous failed
+     * defragmentation attempt was found.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param partId Partionion index to check.
+     * @param log Logger to write messages.
+     * @return {@code true} if given partition is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartMappingFile(File, int)
+     */
+    public static boolean skipAlreadyDefragmentedPartition(File workDir, int 
grpId, int partId, IgniteLogger log) throws IgniteCheckedException {
+        File defragmentedPartFile = defragmentedPartFile(workDir, partId);
+        File defragmentedPartMappingFile = 
defragmentedPartMappingFile(workDir, partId);
+
+        if (defragmentedPartFile.exists() && 
defragmentedPartMappingFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented partition",
+                    "grpId", grpId, false,
+                    "partId", partId, false,
+                    "partFileName", defragmentedPartFile.getName(), false,
+                    "mappingFileName", defragmentedPartMappingFile.getName(), 
false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            return true;
+        }
+
+        File defragmentedPartTmpFile = defragmentedPartTmpFile(workDir, 
partId);
+
+        try {
+            Files.deleteIfExists(defragmentedPartTmpFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartMappingFile.toPath());
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+
+        return false;
+    }
+
+    /**
+     * Failure-tolerant batch rename of defragmented partition files.
+     *
+     * Deletes all link mapping files old partition and index files, renaming 
defragmentated files in the process. Can
+     * be run on the same folder multiple times if failed for some reason.
+     *
+     * Does something only if completion marker is present in the folder. This 
marker won't be deleted in the end.
+     * Deletion of the marker must be done outside of defragmentation mode to 
prevent cache groups to be defragmentated
+     * several times in case of failures.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see 
DefragmentationFileUtils#writeDefragmentationCompletionMarker(FileIOFactory, 
File, IgniteLogger)
+     */
+    public static void batchRenameDefragmentedCacheGroupPartitions(File 
workDir, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (!completionMarkerFile.exists())
+            return;
+
+        try {
+            for (File mappingFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)))
+                Files.delete(mappingFile.toPath());
+
+            for (File partFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_PARTITION_FILE_PREFIX))) {
+                int partId = extractPartId(partFile.getName());
+
+                File oldPartFile = new File(workDir, 
String.format(PART_FILE_TEMPLATE, partId));
+
+                Files.move(partFile.toPath(), oldPartFile.toPath(), 
ATOMIC_MOVE, REPLACE_EXISTING);
+            }
+
+            File idxFile = new File(workDir, DFRG_INDEX_FILE_NAME);
+
+            if (idxFile.exists()) {
+                File oldIdxFile = new File(workDir, INDEX_FILE_NAME);
+
+                Files.move(idxFile.toPath(), oldIdxFile.toPath(), ATOMIC_MOVE, 
REPLACE_EXISTING);
+            }
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /**
+     * Extracts partition number from file names like {@code part-dfrg-%d.bin}.
+     *
+     * @param dfrgPartFileName Defragmented partition file name.
+     * @return Partition index.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     */
+    private static int extractPartId(String dfrgPartFileName) {
+        assert dfrgPartFileName.startsWith(DFRG_PARTITION_FILE_PREFIX) : 
dfrgPartFileName;
+        assert dfrgPartFileName.endsWith(FILE_SUFFIX) : dfrgPartFileName;
+
+        String partIdStr = dfrgPartFileName.substring(
+            DFRG_PARTITION_FILE_PREFIX.length(),
+            dfrgPartFileName.length() - FILE_SUFFIX.length()
+        );
+
+        return Integer.parseInt(partIdStr);
+    }
+
+    /**
+     * Return file named {@code index-dfrg.bin.tmp} in given folder. It will 
be used for storing defragmented index
+     * partition during the process.
+     *
+     * @param workDir Cache group working directory.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexFile(File)
+     */
+    public static File defragmentedIndexTmpFile(File workDir) {
+        return new File(workDir, DFRG_INDEX_TMP_FILE_NAME);
+    }
+
+    /**
+     * Return file named {@code index-dfrg.bin} in given folder. It will be 
used for storing defragmented index
+     * partition when the process is over.
+     *
+     * @param workDir Cache group working directory.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexTmpFile(File)
+     */
+    public static File defragmentedIndexFile(File workDir) {
+        return new File(workDir, DFRG_INDEX_FILE_NAME);
+    }
+
+    /**
+     * Rename temporary index defragmenation file to a finalized one.
+     *
+     * @param workDir Cache group working directory.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexTmpFile(File)
+     * @see DefragmentationFileUtils#defragmentedIndexFile(File)
+     */
+    public static void renameTempIndexFile(File workDir) throws 
IgniteCheckedException {
+        File defragmentedIdxTmpFile = defragmentedIndexTmpFile(workDir);
+        File defragmentedIdxFile = defragmentedIndexFile(workDir);
+
+        try {
+            Files.move(defragmentedIdxTmpFile.toPath(), 
defragmentedIdxFile.toPath(), ATOMIC_MOVE);
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /**
+     * Return file named {@code part-dfrg-%d.bin.tmp} in given folder. It will 
be used for storing defragmented data
+     * partition during the process.
+     *
+     * @param workDir Cache group working directory.
+     * @param partId Parition index, will be substituted into file name.

Review comment:
       Parition -> Partition

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
+
+/**
+ * Everything related to file management during defragmentation process.
+ */
+public class DefragmentationFileUtils {
+    /** Prefix for link mapping files. */
+    private static final String DFRG_LINK_MAPPING_FILE_PREFIX = 
PART_FILE_PREFIX + "map-";
+
+    /** Link mapping file template. */
+    private static final String DFRG_LINK_MAPPING_FILE_TEMPLATE = 
DFRG_LINK_MAPPING_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmentation complation marker file name. */
+    private static final String DFRG_COMPLETION_MARKER_FILE_NAME = 
"dfrg-completion-marker";
+
+    /** Name of defragmentated index partition file. */
+    private static final String DFRG_INDEX_FILE_NAME = INDEX_FILE_PREFIX + 
"-dfrg" + FILE_SUFFIX;
+
+    /** Name of defragmentated index partition temporary file. */
+    private static final String DFRG_INDEX_TMP_FILE_NAME = 
DFRG_INDEX_FILE_NAME + TMP_SUFFIX;
+
+    /** Prefix for defragmented partition files. */
+    private static final String DFRG_PARTITION_FILE_PREFIX = PART_FILE_PREFIX 
+ "dfrg-";
+
+    /** Defragmented partition file template. */
+    private static final String DFRG_PARTITION_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmented partition temp file template. */
+    private static final String DFRG_PARTITION_TMP_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_TEMPLATE + TMP_SUFFIX;
+
+    /**
+     * Performs cleanup of work dir before initializing file page stores.
+     * Will finish batch renaming if defragmentation was completed or delete 
garbage if it wasn't.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     */
+    public static void beforeInitPageStores(File workDir, IgniteLogger log) 
throws IgniteCheckedException {
+        batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+        U.delete(defragmentationCompletionMarkerFile(workDir));
+
+        for (File file : workDir.listFiles()) {
+            String fileName = file.getName();
+
+            if (
+                fileName.startsWith(DFRG_PARTITION_FILE_PREFIX)
+                || fileName.startsWith(DFRG_INDEX_FILE_NAME)
+                || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)
+            )
+                U.delete(file);
+        }
+    }
+
+    /**
+     * Checks whether cache group defragmentation completed or not. Completes 
it if all that's left is renaming.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param log Logger to write messages.
+     * @return {@code true} if given cache group is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentationCompletionMarkerFile(File)
+     */
+    public static boolean skipAlreadyDefragmentedCacheGroup(File workDir, int 
grpId, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (completionMarkerFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented page group",
+                    "grpId", grpId, false,
+                    "markerFileName", completionMarkerFile.getName(), false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks whether partition has already been defragmented or not. Cleans 
corrupted data if previous failed
+     * defragmentation attempt was found.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param partId Partionion index to check.
+     * @param log Logger to write messages.
+     * @return {@code true} if given partition is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartMappingFile(File, int)
+     */
+    public static boolean skipAlreadyDefragmentedPartition(File workDir, int 
grpId, int partId, IgniteLogger log) throws IgniteCheckedException {
+        File defragmentedPartFile = defragmentedPartFile(workDir, partId);
+        File defragmentedPartMappingFile = 
defragmentedPartMappingFile(workDir, partId);
+
+        if (defragmentedPartFile.exists() && 
defragmentedPartMappingFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented partition",
+                    "grpId", grpId, false,
+                    "partId", partId, false,
+                    "partFileName", defragmentedPartFile.getName(), false,
+                    "mappingFileName", defragmentedPartMappingFile.getName(), 
false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            return true;
+        }
+
+        File defragmentedPartTmpFile = defragmentedPartTmpFile(workDir, 
partId);
+
+        try {
+            Files.deleteIfExists(defragmentedPartTmpFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartMappingFile.toPath());
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+
+        return false;
+    }
+
+    /**
+     * Failure-tolerant batch rename of defragmented partition files.
+     *
+     * Deletes all link mapping files old partition and index files, renaming 
defragmentated files in the process. Can
+     * be run on the same folder multiple times if failed for some reason.
+     *
+     * Does something only if completion marker is present in the folder. This 
marker won't be deleted in the end.
+     * Deletion of the marker must be done outside of defragmentation mode to 
prevent cache groups to be defragmentated
+     * several times in case of failures.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see 
DefragmentationFileUtils#writeDefragmentationCompletionMarker(FileIOFactory, 
File, IgniteLogger)
+     */
+    public static void batchRenameDefragmentedCacheGroupPartitions(File 
workDir, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (!completionMarkerFile.exists())
+            return;
+
+        try {
+            for (File mappingFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)))
+                Files.delete(mappingFile.toPath());
+
+            for (File partFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_PARTITION_FILE_PREFIX))) {
+                int partId = extractPartId(partFile.getName());
+
+                File oldPartFile = new File(workDir, 
String.format(PART_FILE_TEMPLATE, partId));
+
+                Files.move(partFile.toPath(), oldPartFile.toPath(), 
ATOMIC_MOVE, REPLACE_EXISTING);
+            }
+
+            File idxFile = new File(workDir, DFRG_INDEX_FILE_NAME);
+
+            if (idxFile.exists()) {
+                File oldIdxFile = new File(workDir, INDEX_FILE_NAME);
+
+                Files.move(idxFile.toPath(), oldIdxFile.toPath(), ATOMIC_MOVE, 
REPLACE_EXISTING);
+            }
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /**
+     * Extracts partition number from file names like {@code part-dfrg-%d.bin}.
+     *
+     * @param dfrgPartFileName Defragmented partition file name.
+     * @return Partition index.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     */
+    private static int extractPartId(String dfrgPartFileName) {
+        assert dfrgPartFileName.startsWith(DFRG_PARTITION_FILE_PREFIX) : 
dfrgPartFileName;
+        assert dfrgPartFileName.endsWith(FILE_SUFFIX) : dfrgPartFileName;
+
+        String partIdStr = dfrgPartFileName.substring(
+            DFRG_PARTITION_FILE_PREFIX.length(),
+            dfrgPartFileName.length() - FILE_SUFFIX.length()
+        );
+
+        return Integer.parseInt(partIdStr);
+    }
+
+    /**
+     * Return file named {@code index-dfrg.bin.tmp} in given folder. It will 
be used for storing defragmented index
+     * partition during the process.
+     *
+     * @param workDir Cache group working directory.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexFile(File)
+     */
+    public static File defragmentedIndexTmpFile(File workDir) {
+        return new File(workDir, DFRG_INDEX_TMP_FILE_NAME);
+    }
+
+    /**
+     * Return file named {@code index-dfrg.bin} in given folder. It will be 
used for storing defragmented index
+     * partition when the process is over.
+     *
+     * @param workDir Cache group working directory.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexTmpFile(File)
+     */
+    public static File defragmentedIndexFile(File workDir) {
+        return new File(workDir, DFRG_INDEX_FILE_NAME);
+    }
+
+    /**
+     * Rename temporary index defragmenation file to a finalized one.

Review comment:
       defragmenation -> defragmentation

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
+
+/**
+ * Everything related to file management during defragmentation process.
+ */
+public class DefragmentationFileUtils {
+    /** Prefix for link mapping files. */
+    private static final String DFRG_LINK_MAPPING_FILE_PREFIX = 
PART_FILE_PREFIX + "map-";
+
+    /** Link mapping file template. */
+    private static final String DFRG_LINK_MAPPING_FILE_TEMPLATE = 
DFRG_LINK_MAPPING_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmentation complation marker file name. */
+    private static final String DFRG_COMPLETION_MARKER_FILE_NAME = 
"dfrg-completion-marker";
+
+    /** Name of defragmentated index partition file. */
+    private static final String DFRG_INDEX_FILE_NAME = INDEX_FILE_PREFIX + 
"-dfrg" + FILE_SUFFIX;
+
+    /** Name of defragmentated index partition temporary file. */
+    private static final String DFRG_INDEX_TMP_FILE_NAME = 
DFRG_INDEX_FILE_NAME + TMP_SUFFIX;
+
+    /** Prefix for defragmented partition files. */
+    private static final String DFRG_PARTITION_FILE_PREFIX = PART_FILE_PREFIX 
+ "dfrg-";
+
+    /** Defragmented partition file template. */
+    private static final String DFRG_PARTITION_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmented partition temp file template. */
+    private static final String DFRG_PARTITION_TMP_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_TEMPLATE + TMP_SUFFIX;
+
+    /**
+     * Performs cleanup of work dir before initializing file page stores.
+     * Will finish batch renaming if defragmentation was completed or delete 
garbage if it wasn't.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     */
+    public static void beforeInitPageStores(File workDir, IgniteLogger log) 
throws IgniteCheckedException {
+        batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+        U.delete(defragmentationCompletionMarkerFile(workDir));
+
+        for (File file : workDir.listFiles()) {
+            String fileName = file.getName();
+
+            if (
+                fileName.startsWith(DFRG_PARTITION_FILE_PREFIX)
+                || fileName.startsWith(DFRG_INDEX_FILE_NAME)
+                || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)
+            )
+                U.delete(file);
+        }
+    }
+
+    /**
+     * Checks whether cache group defragmentation completed or not. Completes 
it if all that's left is renaming.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param log Logger to write messages.
+     * @return {@code true} if given cache group is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentationCompletionMarkerFile(File)
+     */
+    public static boolean skipAlreadyDefragmentedCacheGroup(File workDir, int 
grpId, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (completionMarkerFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented page group",
+                    "grpId", grpId, false,
+                    "markerFileName", completionMarkerFile.getName(), false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks whether partition has already been defragmented or not. Cleans 
corrupted data if previous failed
+     * defragmentation attempt was found.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param partId Partionion index to check.
+     * @param log Logger to write messages.
+     * @return {@code true} if given partition is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartMappingFile(File, int)
+     */
+    public static boolean skipAlreadyDefragmentedPartition(File workDir, int 
grpId, int partId, IgniteLogger log) throws IgniteCheckedException {
+        File defragmentedPartFile = defragmentedPartFile(workDir, partId);
+        File defragmentedPartMappingFile = 
defragmentedPartMappingFile(workDir, partId);
+
+        if (defragmentedPartFile.exists() && 
defragmentedPartMappingFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented partition",
+                    "grpId", grpId, false,
+                    "partId", partId, false,
+                    "partFileName", defragmentedPartFile.getName(), false,
+                    "mappingFileName", defragmentedPartMappingFile.getName(), 
false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            return true;
+        }
+
+        File defragmentedPartTmpFile = defragmentedPartTmpFile(workDir, 
partId);
+
+        try {
+            Files.deleteIfExists(defragmentedPartTmpFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartMappingFile.toPath());
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+
+        return false;
+    }
+
+    /**
+     * Failure-tolerant batch rename of defragmented partition files.
+     *
+     * Deletes all link mapping files old partition and index files, renaming 
defragmentated files in the process. Can
+     * be run on the same folder multiple times if failed for some reason.
+     *
+     * Does something only if completion marker is present in the folder. This 
marker won't be deleted in the end.
+     * Deletion of the marker must be done outside of defragmentation mode to 
prevent cache groups to be defragmentated
+     * several times in case of failures.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see 
DefragmentationFileUtils#writeDefragmentationCompletionMarker(FileIOFactory, 
File, IgniteLogger)
+     */
+    public static void batchRenameDefragmentedCacheGroupPartitions(File 
workDir, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (!completionMarkerFile.exists())
+            return;
+
+        try {
+            for (File mappingFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)))
+                Files.delete(mappingFile.toPath());
+
+            for (File partFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_PARTITION_FILE_PREFIX))) {
+                int partId = extractPartId(partFile.getName());
+
+                File oldPartFile = new File(workDir, 
String.format(PART_FILE_TEMPLATE, partId));
+
+                Files.move(partFile.toPath(), oldPartFile.toPath(), 
ATOMIC_MOVE, REPLACE_EXISTING);
+            }
+
+            File idxFile = new File(workDir, DFRG_INDEX_FILE_NAME);
+
+            if (idxFile.exists()) {
+                File oldIdxFile = new File(workDir, INDEX_FILE_NAME);
+
+                Files.move(idxFile.toPath(), oldIdxFile.toPath(), ATOMIC_MOVE, 
REPLACE_EXISTING);
+            }
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /**
+     * Extracts partition number from file names like {@code part-dfrg-%d.bin}.
+     *
+     * @param dfrgPartFileName Defragmented partition file name.
+     * @return Partition index.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     */
+    private static int extractPartId(String dfrgPartFileName) {
+        assert dfrgPartFileName.startsWith(DFRG_PARTITION_FILE_PREFIX) : 
dfrgPartFileName;
+        assert dfrgPartFileName.endsWith(FILE_SUFFIX) : dfrgPartFileName;
+
+        String partIdStr = dfrgPartFileName.substring(
+            DFRG_PARTITION_FILE_PREFIX.length(),
+            dfrgPartFileName.length() - FILE_SUFFIX.length()
+        );
+
+        return Integer.parseInt(partIdStr);
+    }
+
+    /**
+     * Return file named {@code index-dfrg.bin.tmp} in given folder. It will 
be used for storing defragmented index
+     * partition during the process.
+     *
+     * @param workDir Cache group working directory.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexFile(File)
+     */
+    public static File defragmentedIndexTmpFile(File workDir) {
+        return new File(workDir, DFRG_INDEX_TMP_FILE_NAME);
+    }
+
+    /**
+     * Return file named {@code index-dfrg.bin} in given folder. It will be 
used for storing defragmented index
+     * partition when the process is over.
+     *
+     * @param workDir Cache group working directory.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexTmpFile(File)
+     */
+    public static File defragmentedIndexFile(File workDir) {
+        return new File(workDir, DFRG_INDEX_FILE_NAME);
+    }
+
+    /**
+     * Rename temporary index defragmenation file to a finalized one.
+     *
+     * @param workDir Cache group working directory.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexTmpFile(File)
+     * @see DefragmentationFileUtils#defragmentedIndexFile(File)
+     */
+    public static void renameTempIndexFile(File workDir) throws 
IgniteCheckedException {
+        File defragmentedIdxTmpFile = defragmentedIndexTmpFile(workDir);
+        File defragmentedIdxFile = defragmentedIndexFile(workDir);
+
+        try {
+            Files.move(defragmentedIdxTmpFile.toPath(), 
defragmentedIdxFile.toPath(), ATOMIC_MOVE);
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /**
+     * Return file named {@code part-dfrg-%d.bin.tmp} in given folder. It will 
be used for storing defragmented data
+     * partition during the process.
+     *
+     * @param workDir Cache group working directory.
+     * @param partId Parition index, will be substituted into file name.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     */
+    public static File defragmentedPartTmpFile(File workDir, int partId) {
+        return new File(workDir, 
String.format(DFRG_PARTITION_TMP_FILE_TEMPLATE, partId));
+    }
+
+    /**
+     * Return file named {@code part-dfrg-%d.bin} in given folder. It will be 
used for storing defragmented data
+     * partition when the process is over.
+     *
+     * @param workDir Cache group working directory.
+     * @param partId Parition index, will be substituted into file name.

Review comment:
       Parition -> Partition

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
+
+/**
+ * Everything related to file management during defragmentation process.
+ */
+public class DefragmentationFileUtils {
+    /** Prefix for link mapping files. */
+    private static final String DFRG_LINK_MAPPING_FILE_PREFIX = 
PART_FILE_PREFIX + "map-";
+
+    /** Link mapping file template. */
+    private static final String DFRG_LINK_MAPPING_FILE_TEMPLATE = 
DFRG_LINK_MAPPING_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmentation complation marker file name. */
+    private static final String DFRG_COMPLETION_MARKER_FILE_NAME = 
"dfrg-completion-marker";
+
+    /** Name of defragmentated index partition file. */
+    private static final String DFRG_INDEX_FILE_NAME = INDEX_FILE_PREFIX + 
"-dfrg" + FILE_SUFFIX;
+
+    /** Name of defragmentated index partition temporary file. */
+    private static final String DFRG_INDEX_TMP_FILE_NAME = 
DFRG_INDEX_FILE_NAME + TMP_SUFFIX;
+
+    /** Prefix for defragmented partition files. */
+    private static final String DFRG_PARTITION_FILE_PREFIX = PART_FILE_PREFIX 
+ "dfrg-";
+
+    /** Defragmented partition file template. */
+    private static final String DFRG_PARTITION_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmented partition temp file template. */
+    private static final String DFRG_PARTITION_TMP_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_TEMPLATE + TMP_SUFFIX;
+
+    /**
+     * Performs cleanup of work dir before initializing file page stores.
+     * Will finish batch renaming if defragmentation was completed or delete 
garbage if it wasn't.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     */
+    public static void beforeInitPageStores(File workDir, IgniteLogger log) 
throws IgniteCheckedException {
+        batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+        U.delete(defragmentationCompletionMarkerFile(workDir));
+
+        for (File file : workDir.listFiles()) {
+            String fileName = file.getName();
+
+            if (
+                fileName.startsWith(DFRG_PARTITION_FILE_PREFIX)
+                || fileName.startsWith(DFRG_INDEX_FILE_NAME)
+                || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)
+            )
+                U.delete(file);
+        }
+    }
+
+    /**
+     * Checks whether cache group defragmentation completed or not. Completes 
it if all that's left is renaming.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param log Logger to write messages.
+     * @return {@code true} if given cache group is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentationCompletionMarkerFile(File)
+     */
+    public static boolean skipAlreadyDefragmentedCacheGroup(File workDir, int 
grpId, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (completionMarkerFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented page group",
+                    "grpId", grpId, false,
+                    "markerFileName", completionMarkerFile.getName(), false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks whether partition has already been defragmented or not. Cleans 
corrupted data if previous failed
+     * defragmentation attempt was found.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param partId Partionion index to check.
+     * @param log Logger to write messages.
+     * @return {@code true} if given partition is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     * @see DefragmentationFileUtils#defragmentedPartMappingFile(File, int)
+     */
+    public static boolean skipAlreadyDefragmentedPartition(File workDir, int 
grpId, int partId, IgniteLogger log) throws IgniteCheckedException {
+        File defragmentedPartFile = defragmentedPartFile(workDir, partId);
+        File defragmentedPartMappingFile = 
defragmentedPartMappingFile(workDir, partId);
+
+        if (defragmentedPartFile.exists() && 
defragmentedPartMappingFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented partition",
+                    "grpId", grpId, false,
+                    "partId", partId, false,
+                    "partFileName", defragmentedPartFile.getName(), false,
+                    "mappingFileName", defragmentedPartMappingFile.getName(), 
false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            return true;
+        }
+
+        File defragmentedPartTmpFile = defragmentedPartTmpFile(workDir, 
partId);
+
+        try {
+            Files.deleteIfExists(defragmentedPartTmpFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartFile.toPath());
+
+            Files.deleteIfExists(defragmentedPartMappingFile.toPath());
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+
+        return false;
+    }
+
+    /**
+     * Failure-tolerant batch rename of defragmented partition files.
+     *
+     * Deletes all link mapping files old partition and index files, renaming 
defragmentated files in the process. Can
+     * be run on the same folder multiple times if failed for some reason.
+     *
+     * Does something only if completion marker is present in the folder. This 
marker won't be deleted in the end.
+     * Deletion of the marker must be done outside of defragmentation mode to 
prevent cache groups to be defragmentated
+     * several times in case of failures.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see 
DefragmentationFileUtils#writeDefragmentationCompletionMarker(FileIOFactory, 
File, IgniteLogger)
+     */
+    public static void batchRenameDefragmentedCacheGroupPartitions(File 
workDir, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (!completionMarkerFile.exists())
+            return;
+
+        try {
+            for (File mappingFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)))
+                Files.delete(mappingFile.toPath());
+
+            for (File partFile : workDir.listFiles((dir, name) -> 
name.startsWith(DFRG_PARTITION_FILE_PREFIX))) {
+                int partId = extractPartId(partFile.getName());
+
+                File oldPartFile = new File(workDir, 
String.format(PART_FILE_TEMPLATE, partId));
+
+                Files.move(partFile.toPath(), oldPartFile.toPath(), 
ATOMIC_MOVE, REPLACE_EXISTING);
+            }
+
+            File idxFile = new File(workDir, DFRG_INDEX_FILE_NAME);
+
+            if (idxFile.exists()) {
+                File oldIdxFile = new File(workDir, INDEX_FILE_NAME);
+
+                Files.move(idxFile.toPath(), oldIdxFile.toPath(), ATOMIC_MOVE, 
REPLACE_EXISTING);
+            }
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /**
+     * Extracts partition number from file names like {@code part-dfrg-%d.bin}.
+     *
+     * @param dfrgPartFileName Defragmented partition file name.
+     * @return Partition index.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     */
+    private static int extractPartId(String dfrgPartFileName) {
+        assert dfrgPartFileName.startsWith(DFRG_PARTITION_FILE_PREFIX) : 
dfrgPartFileName;
+        assert dfrgPartFileName.endsWith(FILE_SUFFIX) : dfrgPartFileName;
+
+        String partIdStr = dfrgPartFileName.substring(
+            DFRG_PARTITION_FILE_PREFIX.length(),
+            dfrgPartFileName.length() - FILE_SUFFIX.length()
+        );
+
+        return Integer.parseInt(partIdStr);
+    }
+
+    /**
+     * Return file named {@code index-dfrg.bin.tmp} in given folder. It will 
be used for storing defragmented index
+     * partition during the process.
+     *
+     * @param workDir Cache group working directory.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexFile(File)
+     */
+    public static File defragmentedIndexTmpFile(File workDir) {
+        return new File(workDir, DFRG_INDEX_TMP_FILE_NAME);
+    }
+
+    /**
+     * Return file named {@code index-dfrg.bin} in given folder. It will be 
used for storing defragmented index
+     * partition when the process is over.
+     *
+     * @param workDir Cache group working directory.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexTmpFile(File)
+     */
+    public static File defragmentedIndexFile(File workDir) {
+        return new File(workDir, DFRG_INDEX_FILE_NAME);
+    }
+
+    /**
+     * Rename temporary index defragmenation file to a finalized one.
+     *
+     * @param workDir Cache group working directory.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentedIndexTmpFile(File)
+     * @see DefragmentationFileUtils#defragmentedIndexFile(File)
+     */
+    public static void renameTempIndexFile(File workDir) throws 
IgniteCheckedException {
+        File defragmentedIdxTmpFile = defragmentedIndexTmpFile(workDir);
+        File defragmentedIdxFile = defragmentedIndexFile(workDir);
+
+        try {
+            Files.move(defragmentedIdxTmpFile.toPath(), 
defragmentedIdxFile.toPath(), ATOMIC_MOVE);
+        }
+        catch (IOException e) {
+            handleIoException(e);
+        }
+    }
+
+    /**
+     * Return file named {@code part-dfrg-%d.bin.tmp} in given folder. It will 
be used for storing defragmented data
+     * partition during the process.
+     *
+     * @param workDir Cache group working directory.
+     * @param partId Parition index, will be substituted into file name.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartFile(File, int)
+     */
+    public static File defragmentedPartTmpFile(File workDir, int partId) {
+        return new File(workDir, 
String.format(DFRG_PARTITION_TMP_FILE_TEMPLATE, partId));
+    }
+
+    /**
+     * Return file named {@code part-dfrg-%d.bin} in given folder. It will be 
used for storing defragmented data
+     * partition when the process is over.
+     *
+     * @param workDir Cache group working directory.
+     * @param partId Parition index, will be substituted into file name.
+     * @return File.
+     *
+     * @see DefragmentationFileUtils#defragmentedPartTmpFile(File, int)
+     */
+    public static File defragmentedPartFile(File workDir, int partId) {
+        return new File(workDir, String.format(DFRG_PARTITION_FILE_TEMPLATE, 
partId));
+    }
+
+    /**
+     * Rename temporary partition defragmenation file to a finalized one.
+     *
+     * @param workDir Cache group working directory.
+     * @param partId Parition index.

Review comment:
       Parition -> Partition

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationFileUtils.java
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
+
+/**
+ * Everything related to file management during defragmentation process.
+ */
+public class DefragmentationFileUtils {
+    /** Prefix for link mapping files. */
+    private static final String DFRG_LINK_MAPPING_FILE_PREFIX = 
PART_FILE_PREFIX + "map-";
+
+    /** Link mapping file template. */
+    private static final String DFRG_LINK_MAPPING_FILE_TEMPLATE = 
DFRG_LINK_MAPPING_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmentation complation marker file name. */
+    private static final String DFRG_COMPLETION_MARKER_FILE_NAME = 
"dfrg-completion-marker";
+
+    /** Name of defragmentated index partition file. */
+    private static final String DFRG_INDEX_FILE_NAME = INDEX_FILE_PREFIX + 
"-dfrg" + FILE_SUFFIX;
+
+    /** Name of defragmentated index partition temporary file. */
+    private static final String DFRG_INDEX_TMP_FILE_NAME = 
DFRG_INDEX_FILE_NAME + TMP_SUFFIX;
+
+    /** Prefix for defragmented partition files. */
+    private static final String DFRG_PARTITION_FILE_PREFIX = PART_FILE_PREFIX 
+ "dfrg-";
+
+    /** Defragmented partition file template. */
+    private static final String DFRG_PARTITION_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_PREFIX + "%d" + FILE_SUFFIX;
+
+    /** Defragmented partition temp file template. */
+    private static final String DFRG_PARTITION_TMP_FILE_TEMPLATE = 
DFRG_PARTITION_FILE_TEMPLATE + TMP_SUFFIX;
+
+    /**
+     * Performs cleanup of work dir before initializing file page stores.
+     * Will finish batch renaming if defragmentation was completed or delete 
garbage if it wasn't.
+     *
+     * @param workDir Cache group working directory.
+     * @param log Logger to write messages.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     */
+    public static void beforeInitPageStores(File workDir, IgniteLogger log) 
throws IgniteCheckedException {
+        batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+        U.delete(defragmentationCompletionMarkerFile(workDir));
+
+        for (File file : workDir.listFiles()) {
+            String fileName = file.getName();
+
+            if (
+                fileName.startsWith(DFRG_PARTITION_FILE_PREFIX)
+                || fileName.startsWith(DFRG_INDEX_FILE_NAME)
+                || fileName.startsWith(DFRG_LINK_MAPPING_FILE_PREFIX)
+            )
+                U.delete(file);
+        }
+    }
+
+    /**
+     * Checks whether cache group defragmentation completed or not. Completes 
it if all that's left is renaming.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param log Logger to write messages.
+     * @return {@code true} if given cache group is already defragmented.
+     * @throws IgniteCheckedException If {@link IOException} occurred.
+     *
+     * @see DefragmentationFileUtils#defragmentationCompletionMarkerFile(File)
+     */
+    public static boolean skipAlreadyDefragmentedCacheGroup(File workDir, int 
grpId, IgniteLogger log) throws IgniteCheckedException {
+        File completionMarkerFile = 
defragmentationCompletionMarkerFile(workDir);
+
+        if (completionMarkerFile.exists()) {
+            if (log.isInfoEnabled()) {
+                log.info(S.toString(
+                    "Skipping already defragmented page group",
+                    "grpId", grpId, false,
+                    "markerFileName", completionMarkerFile.getName(), false,
+                    "workDir", workDir.getAbsolutePath(), false
+                ));
+            }
+
+            batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Checks whether partition has already been defragmented or not. Cleans 
corrupted data if previous failed
+     * defragmentation attempt was found.
+     *
+     * @param workDir Cache group working directory.
+     * @param grpId Cache group Id of cache group belonging to the given 
working directory.
+     * @param partId Partionion index to check.

Review comment:
       Partionion -> Partition

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
##########
@@ -0,0 +1,896 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongConsumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager.GridCacheDataStore;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointTimeoutLock;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.LightweightCheckpointManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
+import 
org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV3;
+import org.apache.ignite.internal.processors.cache.tree.AbstractDataLeafIO;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
+import org.apache.ignite.internal.processors.cache.tree.DataRow;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.PendingRow;
+import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.collection.IntHashMap;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
+
+import static java.util.stream.StreamSupport.stream;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_MAPPING_REGION_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_PART_REGION_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.CP_LOCK;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.INSERT_ROW;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.ITERATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.METADATA;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.READ_ROW;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_MAP;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_PENDING;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_PK;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.batchRenameDefragmentedCacheGroupPartitions;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedIndexTmpFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartMappingFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartTmpFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.renameTempIndexFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.renameTempPartitionFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.skipAlreadyDefragmentedCacheGroup;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.skipAlreadyDefragmentedPartition;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.writeDefragmentationCompletionMarker;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.PageAccessType.ACCESS_READ;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.PageAccessType.ACCESS_WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.access;
+
+/**
+ * Defragmentation manager is the core class that contains main 
defragmentation procedure.
+ */
+public class CachePartitionDefragmentationManager {
+    /** */
+    public static final String DEFRAGMENTATION_MNTC_TASK_NAME = 
"defragmentationMaintenanceTask";
+
+    /** */
+    private final Set<Integer> cacheGroupsForDefragmentation;
+
+    /** Cache shared context. */
+    private final GridCacheSharedContext<?, ?> sharedCtx;
+
+    /** Maintenance registry. */
+    private final MaintenanceRegistry mntcReg;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Database schared manager. */

Review comment:
       schared -> shared

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
##########
@@ -0,0 +1,896 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongConsumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager.GridCacheDataStore;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointTimeoutLock;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.LightweightCheckpointManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
+import 
org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV3;
+import org.apache.ignite.internal.processors.cache.tree.AbstractDataLeafIO;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
+import org.apache.ignite.internal.processors.cache.tree.DataRow;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.PendingRow;
+import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.collection.IntHashMap;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
+
+import static java.util.stream.StreamSupport.stream;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_MAPPING_REGION_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_PART_REGION_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.CP_LOCK;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.INSERT_ROW;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.ITERATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.METADATA;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.READ_ROW;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_MAP;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_PENDING;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_PK;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.batchRenameDefragmentedCacheGroupPartitions;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedIndexTmpFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartMappingFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartTmpFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.renameTempIndexFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.renameTempPartitionFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.skipAlreadyDefragmentedCacheGroup;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.skipAlreadyDefragmentedPartition;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.writeDefragmentationCompletionMarker;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.PageAccessType.ACCESS_READ;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.PageAccessType.ACCESS_WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.access;
+
+/**
+ * Defragmentation manager is the core class that contains main 
defragmentation procedure.
+ */
+public class CachePartitionDefragmentationManager {
+    /** */
+    public static final String DEFRAGMENTATION_MNTC_TASK_NAME = 
"defragmentationMaintenanceTask";
+
+    /** */
+    private final Set<Integer> cacheGroupsForDefragmentation;
+
+    /** Cache shared context. */
+    private final GridCacheSharedContext<?, ?> sharedCtx;
+
+    /** Maintenance registry. */
+    private final MaintenanceRegistry mntcReg;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Database schared manager. */
+    private final GridCacheDatabaseSharedManager dbMgr;
+
+    /** File page store manager. */
+    private final FilePageStoreManager filePageStoreMgr;
+
+    /**
+     * Checkpoint for specific defragmentation regions which would store the 
data to new partitions
+     * during the defragmentation.
+     */
+    private final LightweightCheckpointManager defragmentationCheckpoint;
+
+    /** Default checkpoint for current node. */
+    private final CheckpointManager nodeCheckpoint;
+
+    /** Page size. */
+    private final int pageSize;
+
+    /** */
+    private final DataRegion partDataRegion;
+
+    /** */
+    private final DataRegion mappingDataRegion;
+
+    /**
+     * @param cacheGrpIds
+     * @param sharedCtx Cache shared context.
+     * @param dbMgr Database manager.
+     * @param filePageStoreMgr File page store manager.
+     * @param nodeCheckpoint Default checkpoint for this node.
+     * @param defragmentationCheckpoint Specific checkpoint for 
defragmentation.
+     * @param pageSize Page size.
+     */
+    public CachePartitionDefragmentationManager(
+        List<Integer> cacheGrpIds,
+        GridCacheSharedContext<?, ?> sharedCtx,
+        GridCacheDatabaseSharedManager dbMgr,
+        FilePageStoreManager filePageStoreMgr,
+        CheckpointManager nodeCheckpoint,
+        LightweightCheckpointManager defragmentationCheckpoint,
+        int pageSize
+    ) throws IgniteCheckedException {
+        cacheGroupsForDefragmentation = new HashSet<>(cacheGrpIds);
+
+        this.dbMgr = dbMgr;
+        this.filePageStoreMgr = filePageStoreMgr;
+        this.pageSize = pageSize;
+        this.sharedCtx = sharedCtx;
+
+        this.mntcReg = sharedCtx.kernalContext().maintenanceRegistry();
+        this.log = sharedCtx.logger(getClass());
+        this.defragmentationCheckpoint = defragmentationCheckpoint;
+        this.nodeCheckpoint = nodeCheckpoint;
+
+        partDataRegion = dbMgr.dataRegion(DEFRAGMENTATION_PART_REGION_NAME);
+        mappingDataRegion = 
dbMgr.dataRegion(DEFRAGMENTATION_MAPPING_REGION_NAME);
+    }
+
+    /** */
+    //TODO How will we handle constant fail and restart scenario?
+    public void executeDefragmentation() throws IgniteCheckedException {
+        log.info("Defragmentation started.");
+
+        try {
+            // Checkpointer must be enabled so all pages on disk are in their 
latest valid state.
+            dbMgr.resumeWalLogging();
+
+            dbMgr.onStateRestored(null);
+
+            nodeCheckpoint.forceCheckpoint("beforeDefragmentation", 
null).futureFor(FINISHED).get();
+
+            sharedCtx.wal().onDeActivate(sharedCtx.kernalContext());
+
+            // Now the actual process starts.
+            TreeIterator treeIter = new TreeIterator(pageSize);
+
+            IgniteInternalFuture<?> idxDfrgFut = null;
+            DataPageEvictionMode prevPageEvictionMode = null;
+
+            for (CacheGroupContext oldGrpCtx : 
sharedCtx.cache().cacheGroups()) {
+                if (!oldGrpCtx.userCache())
+                    continue;
+
+                int grpId = oldGrpCtx.groupId();
+
+                if (!cacheGroupsForDefragmentation.isEmpty() && 
!cacheGroupsForDefragmentation.contains(grpId))
+                    continue;
+
+                File workDir = 
filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), 
oldGrpCtx.cacheOrGroupName());
+
+                if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log))
+                    continue;
+
+                GridCacheOffheapManager offheap = 
(GridCacheOffheapManager)oldGrpCtx.offheap();
+
+                GridSpinBusyLock busyLock = offheap.busyLock();
+
+                List<CacheDataStore> oldCacheDataStores = 
stream(offheap.cacheDataStores().spliterator(), false)
+                    .filter(store -> {
+                        try {
+                            return filePageStoreMgr.exists(grpId, 
store.partId());
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    })
+                    .collect(Collectors.toList());
+
+                if (workDir != null && !oldCacheDataStores.isEmpty()) {
+                    // We can't start defragmentation of new group on the 
region that has wrong eviction mode.
+                    // So waiting of the previous cache group defragmentation 
is inevitable.
+                    DataPageEvictionMode curPageEvictionMode = 
oldGrpCtx.dataRegion().config().getPageEvictionMode();
+
+                    if (prevPageEvictionMode == null || prevPageEvictionMode 
!= curPageEvictionMode) {
+                        prevPageEvictionMode = curPageEvictionMode;
+
+                        
partDataRegion.config().setPageEvictionMode(curPageEvictionMode);
+
+                        if (idxDfrgFut != null)
+                            idxDfrgFut.get();
+                    }
+
+                    IntMap<CacheDataStore> cacheDataStores = new 
IntHashMap<>();
+
+                    for (CacheDataStore store : offheap.cacheDataStores()) {
+                        // Tree can be null for not yet initialized partitions.
+                        // This would mean that these partitions are empty.
+                        assert store.tree() == null || store.tree().groupId() 
== grpId;
+
+                        if (store.tree() != null)
+                            cacheDataStores.put(store.partId(), store);
+                    }
+
+                    //TODO ensure that there are no races.
+                    
dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion());
+
+                    // Another cheat. Ttl cleanup manager knows too much shit.
+                    oldGrpCtx.caches().stream()
+                        .filter(cacheCtx -> cacheCtx.groupId() == grpId)
+                        .forEach(cacheCtx -> cacheCtx.ttl().unregister());
+
+                    // Technically wal is already disabled, but 
"PageHandler.isWalDeltaRecordNeeded" doesn't care and
+                    // WAL records will be allocated anyway just to be ignored 
later if we don't disable WAL for
+                    // cache group explicitly.
+                    oldGrpCtx.localWalEnabled(false, false);
+
+                    boolean encrypted = 
oldGrpCtx.config().isEncryptionEnabled();
+
+                    FilePageStoreFactory pageStoreFactory = 
filePageStoreMgr.getPageStoreFactory(grpId, encrypted);
+
+                    createIndexPageStore(grpId, workDir, pageStoreFactory, 
partDataRegion, val -> {
+                    }); //TODO Allocated tracker.
+
+                    GridCompoundFuture<Object, Object> cmpFut = new 
GridCompoundFuture<>();
+
+                    PageMemoryEx oldPageMem = 
(PageMemoryEx)oldGrpCtx.dataRegion().pageMemory();
+
+                    CacheGroupContext newGrpCtx = new CacheGroupContext(
+                        sharedCtx,
+                        grpId,
+                        oldGrpCtx.receivedFrom(),
+                        CacheType.USER,
+                        oldGrpCtx.config(),
+                        oldGrpCtx.affinityNode(),
+                        partDataRegion,
+                        oldGrpCtx.cacheObjectContext(),
+                        null,
+                        null,
+                        oldGrpCtx.localStartVersion(),
+                        true,
+                        false,
+                        true
+                    );
+
+                    
defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+
+                    try {
+                        // This will initialize partition meta in index 
partition - meta tree and reuse list.
+                        newGrpCtx.start();
+                    }
+                    finally {
+                        
defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+                    }
+
+                    IntMap<LinkMap> linkMapByPart = new IntHashMap<>();
+
+                    for (CacheDataStore oldCacheDataStore : 
oldCacheDataStores) {
+                        int partId = oldCacheDataStore.partId();
+
+                        PartitionContext partCtx = new PartitionContext(
+                            workDir,
+                            grpId,
+                            partId,
+                            partDataRegion,
+                            mappingDataRegion,
+                            oldGrpCtx,
+                            newGrpCtx,
+                            cacheDataStores.get(partId),
+                            pageStoreFactory
+                        );
+
+                        if (skipAlreadyDefragmentedPartition(workDir, grpId, 
partId, log)) {
+                            partCtx.createMappingPageStore();
+
+                            linkMapByPart.put(partId, 
partCtx.createLinkMapTree(false));
+
+                            continue;
+                        }
+
+                        partCtx.createMappingPageStore();
+
+                        linkMapByPart.put(partId, 
partCtx.createLinkMapTree(true));
+
+                        partCtx.createPartPageStore();
+
+                        copyPartitionData(partCtx, treeIter, busyLock);
+
+                        //TODO Move inside of defragmentSinglePartition.
+                        IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut 
-> {
+                            if (fut.error() == null) {
+                                PageStore oldPageStore = null;
+
+                                try {
+                                    oldPageStore = 
filePageStoreMgr.getStore(grpId, partId);
+                                }
+                                catch (IgniteCheckedException ignore) {
+                                }
+
+                                assert oldPageStore != null;
+
+                                if (log.isDebugEnabled()) {
+                                    log.debug(S.toString(
+                                        "Partition defragmented",
+                                        "grpId", grpId, false,
+                                        "partId", partId, false,
+                                        "oldPages", oldPageStore.pages(), 
false,
+                                        "newPages", 
partCtx.partPagesAllocated.get() + 1, false,
+                                        "mappingPages", 
partCtx.mappingPagesAllocated.get() + 1, false,
+                                        "pageSize", pageSize, false,
+                                        "partFile", 
defragmentedPartFile(workDir, partId).getName(), false,
+                                        "workDir", workDir, false
+                                    ));
+                                }
+
+                                oldPageMem.invalidate(grpId, partId);
+
+                                partCtx.partPageMemory.invalidate(grpId, 
partId);
+
+                                DefragmentationPageReadWriteManager pageMgr = 
(DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
+
+                                pageMgr.pageStoreMap().removePageStore(grpId, 
partId); // Yes, it'll be invalid in a second.
+
+                                try {
+                                    renameTempPartitionFile(workDir, partId);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    throw new IgniteException(e);
+                                }
+                            }
+                        };
+
+                        GridFutureAdapter<?> cpFut = defragmentationCheckpoint
+                            .forceCheckpoint("partition defragmented", null)
+                            .futureFor(CheckpointState.FINISHED);
+
+                        cpFut.listen(cpLsnr);
+
+                        cmpFut.add((IgniteInternalFuture<Object>)cpFut);
+                    }
+
+                    // A bit too general for now, but I like it more then 
saving only the last checkpoint future.
+                    cmpFut.markInitialized().get();
+
+                    idxDfrgFut = new GridFinishedFuture<>();
+
+                    if (filePageStoreMgr.hasIndexStore(grpId)) {
+                        defragmentIndexPartition(oldGrpCtx, newGrpCtx, 
linkMapByPart);
+
+                        idxDfrgFut = defragmentationCheckpoint
+                            .forceCheckpoint("index defragmented", null)
+                            .futureFor(CheckpointState.FINISHED);
+                    }
+
+                    idxDfrgFut.listen(fut -> {
+                        oldPageMem.invalidate(grpId, 
PageIdAllocator.INDEX_PARTITION);
+
+                        PageMemoryEx partPageMem = 
(PageMemoryEx)partDataRegion.pageMemory();
+
+                        partPageMem.invalidate(grpId, 
PageIdAllocator.INDEX_PARTITION);
+
+                        DefragmentationPageReadWriteManager pageMgr = 
(DefragmentationPageReadWriteManager)partPageMem.pageManager();
+
+                        pageMgr.pageStoreMap().removePageStore(grpId, 
PageIdAllocator.INDEX_PARTITION);
+
+                        PageMemoryEx mappingPageMem = 
(PageMemoryEx)mappingDataRegion.pageMemory();
+
+                        pageMgr = 
(DefragmentationPageReadWriteManager)mappingPageMem.pageManager();
+
+                        pageMgr.pageStoreMap().clear(grpId);
+
+                        try {
+                            renameTempIndexFile(workDir);
+
+                            
writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(),
 workDir, log);
+
+                            
batchRenameDefragmentedCacheGroupPartitions(workDir, log);
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    });
+                }
+
+                // I guess we should wait for it?
+                if (idxDfrgFut != null)
+                    idxDfrgFut.get();
+            }
+
+            mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
+
+            log.info("Defragmentation completed. All partitions are 
defragmented.");
+        }
+        finally {
+            defragmentationCheckpoint.stop(true);
+        }
+    }
+
+    /** */
+    public void createIndexPageStore(
+        int grpId,
+        File workDir,
+        FilePageStoreFactory pageStoreFactory,
+        DataRegion partRegion,
+        LongConsumer allocatedTracker
+    ) throws IgniteCheckedException {
+        // Index partition file has to be deleted before we begin, otherwise 
there's a chance of reading corrupted file.
+        // There is a time period when index is already defragmented but 
marker file is not created yet. If node is
+        // failed in that time window then index will be deframented once 
again. That's fine, situation is rare but code
+        // to fix that would add unnecessary complications.
+        U.delete(defragmentedIndexTmpFile(workDir));
+
+        PageStore idxPageStore;
+
+        defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+        try {
+            idxPageStore = pageStoreFactory.createPageStore(
+                FLAG_IDX,
+                () -> defragmentedIndexTmpFile(workDir).toPath(),
+                allocatedTracker
+            );
+        }
+        finally {
+            
defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+        }
+
+        idxPageStore.sync();
+
+        PageMemoryEx partPageMem = (PageMemoryEx)partRegion.pageMemory();
+
+        DefragmentationPageReadWriteManager partMgr = 
(DefragmentationPageReadWriteManager)partPageMem.pageManager();
+
+        partMgr.pageStoreMap().addPageStore(grpId, 
PageIdAllocator.INDEX_PARTITION, idxPageStore);
+    }
+
+    /**
+     * Cancel the process of defragmentation.
+     */
+    public void cancel(){

Review comment:
       Will we implement this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to