Mmuzaf commented on a change in pull request #7984:
URL: https://github.com/apache/ignite/pull/7984#discussion_r531513801



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
##########
@@ -1357,8 +1357,19 @@ private void saveIndexReencryptionStatus(int grpId) 
throws IgniteCheckedExceptio
     }
 
     /** */
-    public GridSpinBusyLock busyLock() {
-        return busyLock;
+    public GridCacheDataStore createGridCacheDataStore(

Review comment:
       Please, reuse it also for `createCacheDataStore0`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/TreeIterator.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/** */
+public class TreeIterator {
+    /** Direct memory buffer with a size of one page. */
+    private final ByteBuffer pageBuf;
+
+    /** Offheap page size. */
+    private final int pageSize;
+
+    /** */
+    public TreeIterator(int size) {
+        pageSize = size;
+
+        pageBuf = ByteBuffer.allocateDirect(pageSize);
+    }
+
+    // Performance impact of constant closures allocation is not clear. So 
this method should be avoided in massive
+    // operations like tree leaves access.
+    /** */
+    public static <T> T access(
+        PageAccessType access,
+        PageMemoryEx pageMemory,
+        int grpId,
+        long pageId,
+        PageAccessor<T> accessor
+    ) throws IgniteCheckedException {
+        assert access == PageAccessType.ACCESS_READ || access == 
PageAccessType.ACCESS_WRITE;
+        long page = pageMemory.acquirePage(grpId, pageId);
+
+        try {
+            long pageAddr = access == PageAccessType.ACCESS_READ
+                    ? pageMemory.readLock(grpId, pageId, page)
+                    : pageMemory.writeLock(grpId, pageId, page);
+
+            try {
+                return accessor.access(pageAddr);
+            }
+            finally {
+                if (access == PageAccessType.ACCESS_READ)
+                    pageMemory.readUnlock(grpId, pageId, page);
+                else
+                    pageMemory.writeUnlock(grpId, pageId, page, null, true);
+            }
+        }
+        finally {
+            pageMemory.releasePage(grpId, pageId, page);
+        }
+    }
+
+    /** */
+    @SuppressWarnings("PublicInnerClass")
+    public enum PageAccessType {
+        /** Read access. */
+        ACCESS_READ,
+
+        /** Write access. */
+        ACCESS_WRITE;
+    }
+
+    /** */
+    @SuppressWarnings("PublicInnerClass")
+    @FunctionalInterface
+    public interface PageAccessor<T> {
+        /** */
+        public T access(long pageAddr) throws IgniteCheckedException;
+    }
+
+    /** */
+    // TODO Prefetch future pages?

Review comment:
       Let's fix this.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
##########
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongConsumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager.GridCacheDataStore;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointTimeoutLock;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.LightweightCheckpointManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
+import 
org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV3;
+import org.apache.ignite.internal.processors.cache.tree.AbstractDataLeafIO;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
+import org.apache.ignite.internal.processors.cache.tree.DataRow;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.PendingRow;
+import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.collection.IntHashMap;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
+
+import static java.util.stream.StreamSupport.stream;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_MAPPING_REGION_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_PART_REGION_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.CP_LOCK;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.INSERT_ROW;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.ITERATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.METADATA;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.READ_ROW;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_MAP;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_PENDING;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_PK;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.batchRenameDefragmentedCacheGroupPartitions;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedIndexTmpFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartMappingFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartTmpFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.renameTempIndexFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.renameTempPartitionFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.skipAlreadyDefragmentedCacheGroup;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.skipAlreadyDefragmentedPartition;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.writeDefragmentationCompletionMarker;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.PageAccessType.ACCESS_READ;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.PageAccessType.ACCESS_WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.access;
+
+/**
+ * Defragmentation manager is the core class that contains main 
defragmentation procedure.
+ */
+public class CachePartitionDefragmentationManager {
+    /** */
+    public static final String DEFRAGMENTATION_MNTC_TASK_NAME = 
"defragmentationMaintenanceTask";
+
+    /** */
+    private final Set<Integer> cacheGroupsForDefragmentation;
+
+    /** Cache shared context. */
+    private final GridCacheSharedContext<?, ?> sharedCtx;
+
+    /** Maintenance registry. */
+    private final MaintenanceRegistry mntcReg;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Database shared manager. */
+    private final GridCacheDatabaseSharedManager dbMgr;
+
+    /** File page store manager. */
+    private final FilePageStoreManager filePageStoreMgr;
+
+    /**
+     * Checkpoint for specific defragmentation regions which would store the 
data to new partitions
+     * during the defragmentation.
+     */
+    private final LightweightCheckpointManager defragmentationCheckpoint;
+
+    /** Default checkpoint for current node. */
+    private final CheckpointManager nodeCheckpoint;
+
+    /** Page size. */
+    private final int pageSize;
+
+    /** */
+    private final DataRegion partDataRegion;
+
+    /** */
+    private final DataRegion mappingDataRegion;
+
+    /**
+     * @param cacheGrpIds
+     * @param sharedCtx Cache shared context.
+     * @param dbMgr Database manager.
+     * @param filePageStoreMgr File page store manager.
+     * @param nodeCheckpoint Default checkpoint for this node.
+     * @param defragmentationCheckpoint Specific checkpoint for 
defragmentation.
+     * @param pageSize Page size.
+     */
+    public CachePartitionDefragmentationManager(
+        List<Integer> cacheGrpIds,
+        GridCacheSharedContext<?, ?> sharedCtx,
+        GridCacheDatabaseSharedManager dbMgr,
+        FilePageStoreManager filePageStoreMgr,
+        CheckpointManager nodeCheckpoint,
+        LightweightCheckpointManager defragmentationCheckpoint,
+        int pageSize
+    ) throws IgniteCheckedException {
+        cacheGroupsForDefragmentation = new HashSet<>(cacheGrpIds);
+
+        this.dbMgr = dbMgr;
+        this.filePageStoreMgr = filePageStoreMgr;
+        this.pageSize = pageSize;
+        this.sharedCtx = sharedCtx;
+
+        this.mntcReg = sharedCtx.kernalContext().maintenanceRegistry();
+        this.log = sharedCtx.logger(getClass());
+        this.defragmentationCheckpoint = defragmentationCheckpoint;
+        this.nodeCheckpoint = nodeCheckpoint;
+
+        partDataRegion = dbMgr.dataRegion(DEFRAGMENTATION_PART_REGION_NAME);
+        mappingDataRegion = 
dbMgr.dataRegion(DEFRAGMENTATION_MAPPING_REGION_NAME);
+    }
+
+    /** */
+    public void executeDefragmentation() throws IgniteCheckedException {
+        log.info("Defragmentation started.");
+
+        try {
+            // Checkpointer must be enabled so all pages on disk are in their 
latest valid state.
+            dbMgr.resumeWalLogging();
+
+            dbMgr.onStateRestored(null);
+
+            nodeCheckpoint.forceCheckpoint("beforeDefragmentation", 
null).futureFor(FINISHED).get();
+
+            sharedCtx.wal().onDeActivate(sharedCtx.kernalContext());
+
+            // Now the actual process starts.
+            TreeIterator treeIter = new TreeIterator(pageSize);
+
+            IgniteInternalFuture<?> idxDfrgFut = null;
+            DataPageEvictionMode prevPageEvictionMode = null;
+
+            for (CacheGroupContext oldGrpCtx : 
sharedCtx.cache().cacheGroups()) {
+                if (!oldGrpCtx.userCache())
+                    continue;
+
+                int grpId = oldGrpCtx.groupId();
+
+                if (!cacheGroupsForDefragmentation.isEmpty() && 
!cacheGroupsForDefragmentation.contains(grpId))
+                    continue;
+
+                File workDir = 
filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), 
oldGrpCtx.cacheOrGroupName());
+
+                if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log))
+                    continue;
+
+                GridCacheOffheapManager offheap = 
(GridCacheOffheapManager)oldGrpCtx.offheap();
+
+                List<CacheDataStore> oldCacheDataStores = 
stream(offheap.cacheDataStores().spliterator(), false)
+                    .filter(store -> {
+                        try {
+                            return filePageStoreMgr.exists(grpId, 
store.partId());
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    })
+                    .collect(Collectors.toList());
+
+                if (workDir != null && !oldCacheDataStores.isEmpty()) {
+                    // We can't start defragmentation of new group on the 
region that has wrong eviction mode.
+                    // So waiting of the previous cache group defragmentation 
is inevitable.
+                    DataPageEvictionMode curPageEvictionMode = 
oldGrpCtx.dataRegion().config().getPageEvictionMode();
+
+                    if (prevPageEvictionMode == null || prevPageEvictionMode 
!= curPageEvictionMode) {
+                        prevPageEvictionMode = curPageEvictionMode;
+
+                        
partDataRegion.config().setPageEvictionMode(curPageEvictionMode);
+
+                        if (idxDfrgFut != null)
+                            idxDfrgFut.get();
+                    }
+
+                    IntMap<CacheDataStore> cacheDataStores = new 
IntHashMap<>();
+
+                    for (CacheDataStore store : offheap.cacheDataStores()) {
+                        // Tree can be null for not yet initialized partitions.
+                        // This would mean that these partitions are empty.
+                        assert store.tree() == null || store.tree().groupId() 
== grpId;
+
+                        if (store.tree() != null)
+                            cacheDataStores.put(store.partId(), store);
+                    }
+
+                    //TODO ensure that there are no races.

Review comment:
       Let's fix this.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/TimeTracker.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.util.Arrays;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/** */
+public class TimeTracker<Stage extends Enum<Stage>> {

Review comment:
       Please, remove it.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/TreeIterator.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/** */
+public class TreeIterator {
+    /** Direct memory buffer with a size of one page. */
+    private final ByteBuffer pageBuf;
+
+    /** Offheap page size. */
+    private final int pageSize;
+
+    /** */
+    public TreeIterator(int size) {
+        pageSize = size;
+
+        pageBuf = ByteBuffer.allocateDirect(pageSize);
+    }
+
+    // Performance impact of constant closures allocation is not clear. So 
this method should be avoided in massive
+    // operations like tree leaves access.
+    /** */
+    public static <T> T access(
+        PageAccessType access,
+        PageMemoryEx pageMemory,
+        int grpId,
+        long pageId,
+        PageAccessor<T> accessor
+    ) throws IgniteCheckedException {
+        assert access != null;
+        long page = pageMemory.acquirePage(grpId, pageId);
+
+        try {
+            long pageAddr = access == PageAccessType.ACCESS_READ
+                    ? pageMemory.readLock(grpId, pageId, page)
+                    : pageMemory.writeLock(grpId, pageId, page);
+
+            try {
+                return accessor.access(pageAddr);
+            }
+            finally {
+                if (access == PageAccessType.ACCESS_READ)
+                    pageMemory.readUnlock(grpId, pageId, page);
+                else
+                    pageMemory.writeUnlock(grpId, pageId, page, null, true);
+            }
+        }
+        finally {
+            pageMemory.releasePage(grpId, pageId, page);
+        }
+    }
+
+    /** */
+    @SuppressWarnings("PublicInnerClass")
+    public enum PageAccessType {

Review comment:
       The `ACCESS_WRITE` used only once, so you can paste try-finally block 
directly into the code and eliminate any lambda creation penalties for the page 
writes. As for the `ACCESS_READ` you can rename it to `readPageAddr` and use it 
where it really needs.
   
   PageAccessType is completely useless and it never be extended. 
   PageAccessor -> PageReader
   
   But I see no real reasons for generalization such operations as page 
addresses write/reads. The other parts of code never go this way. It will be 
simpler and better if you'll reuse try-finally approach for all these cases.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
##########
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongConsumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager.GridCacheDataStore;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointTimeoutLock;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.LightweightCheckpointManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
+import 
org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV3;
+import org.apache.ignite.internal.processors.cache.tree.AbstractDataLeafIO;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
+import org.apache.ignite.internal.processors.cache.tree.DataRow;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.PendingRow;
+import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.collection.IntHashMap;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
+
+import static java.util.stream.StreamSupport.stream;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_MAPPING_REGION_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_PART_REGION_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.CP_LOCK;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.INSERT_ROW;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.ITERATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.METADATA;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.READ_ROW;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_MAP;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_PENDING;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_PK;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.batchRenameDefragmentedCacheGroupPartitions;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedIndexTmpFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartMappingFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartTmpFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.renameTempIndexFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.renameTempPartitionFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.skipAlreadyDefragmentedCacheGroup;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.skipAlreadyDefragmentedPartition;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.writeDefragmentationCompletionMarker;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.PageAccessType.ACCESS_READ;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.PageAccessType.ACCESS_WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.access;
+
+/**
+ * Defragmentation manager is the core class that contains main 
defragmentation procedure.
+ */
+public class CachePartitionDefragmentationManager {
+    /** */
+    public static final String DEFRAGMENTATION_MNTC_TASK_NAME = 
"defragmentationMaintenanceTask";
+
+    /** */
+    private final Set<Integer> cacheGroupsForDefragmentation;
+
+    /** Cache shared context. */
+    private final GridCacheSharedContext<?, ?> sharedCtx;
+
+    /** Maintenance registry. */
+    private final MaintenanceRegistry mntcReg;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Database shared manager. */
+    private final GridCacheDatabaseSharedManager dbMgr;
+
+    /** File page store manager. */
+    private final FilePageStoreManager filePageStoreMgr;
+
+    /**
+     * Checkpoint for specific defragmentation regions which would store the 
data to new partitions
+     * during the defragmentation.
+     */
+    private final LightweightCheckpointManager defragmentationCheckpoint;
+
+    /** Default checkpoint for current node. */
+    private final CheckpointManager nodeCheckpoint;
+
+    /** Page size. */
+    private final int pageSize;
+
+    /** */
+    private final DataRegion partDataRegion;
+
+    /** */
+    private final DataRegion mappingDataRegion;
+
+    /**
+     * @param cacheGrpIds
+     * @param sharedCtx Cache shared context.
+     * @param dbMgr Database manager.
+     * @param filePageStoreMgr File page store manager.
+     * @param nodeCheckpoint Default checkpoint for this node.
+     * @param defragmentationCheckpoint Specific checkpoint for 
defragmentation.
+     * @param pageSize Page size.
+     */
+    public CachePartitionDefragmentationManager(
+        List<Integer> cacheGrpIds,
+        GridCacheSharedContext<?, ?> sharedCtx,
+        GridCacheDatabaseSharedManager dbMgr,
+        FilePageStoreManager filePageStoreMgr,
+        CheckpointManager nodeCheckpoint,
+        LightweightCheckpointManager defragmentationCheckpoint,
+        int pageSize
+    ) throws IgniteCheckedException {
+        cacheGroupsForDefragmentation = new HashSet<>(cacheGrpIds);
+
+        this.dbMgr = dbMgr;
+        this.filePageStoreMgr = filePageStoreMgr;
+        this.pageSize = pageSize;
+        this.sharedCtx = sharedCtx;
+
+        this.mntcReg = sharedCtx.kernalContext().maintenanceRegistry();
+        this.log = sharedCtx.logger(getClass());
+        this.defragmentationCheckpoint = defragmentationCheckpoint;
+        this.nodeCheckpoint = nodeCheckpoint;
+
+        partDataRegion = dbMgr.dataRegion(DEFRAGMENTATION_PART_REGION_NAME);
+        mappingDataRegion = 
dbMgr.dataRegion(DEFRAGMENTATION_MAPPING_REGION_NAME);
+    }
+
+    /** */
+    public void executeDefragmentation() throws IgniteCheckedException {
+        log.info("Defragmentation started.");
+
+        try {
+            // Checkpointer must be enabled so all pages on disk are in their 
latest valid state.
+            dbMgr.resumeWalLogging();
+
+            dbMgr.onStateRestored(null);
+
+            nodeCheckpoint.forceCheckpoint("beforeDefragmentation", 
null).futureFor(FINISHED).get();
+
+            sharedCtx.wal().onDeActivate(sharedCtx.kernalContext());
+
+            // Now the actual process starts.
+            TreeIterator treeIter = new TreeIterator(pageSize);
+
+            IgniteInternalFuture<?> idxDfrgFut = null;
+            DataPageEvictionMode prevPageEvictionMode = null;
+
+            for (CacheGroupContext oldGrpCtx : 
sharedCtx.cache().cacheGroups()) {
+                if (!oldGrpCtx.userCache())
+                    continue;
+
+                int grpId = oldGrpCtx.groupId();
+
+                if (!cacheGroupsForDefragmentation.isEmpty() && 
!cacheGroupsForDefragmentation.contains(grpId))
+                    continue;
+
+                File workDir = 
filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), 
oldGrpCtx.cacheOrGroupName());
+
+                if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log))
+                    continue;
+
+                GridCacheOffheapManager offheap = 
(GridCacheOffheapManager)oldGrpCtx.offheap();
+
+                List<CacheDataStore> oldCacheDataStores = 
stream(offheap.cacheDataStores().spliterator(), false)
+                    .filter(store -> {
+                        try {
+                            return filePageStoreMgr.exists(grpId, 
store.partId());
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    })
+                    .collect(Collectors.toList());
+
+                if (workDir != null && !oldCacheDataStores.isEmpty()) {
+                    // We can't start defragmentation of new group on the 
region that has wrong eviction mode.
+                    // So waiting of the previous cache group defragmentation 
is inevitable.
+                    DataPageEvictionMode curPageEvictionMode = 
oldGrpCtx.dataRegion().config().getPageEvictionMode();
+
+                    if (prevPageEvictionMode == null || prevPageEvictionMode 
!= curPageEvictionMode) {
+                        prevPageEvictionMode = curPageEvictionMode;
+
+                        
partDataRegion.config().setPageEvictionMode(curPageEvictionMode);
+
+                        if (idxDfrgFut != null)
+                            idxDfrgFut.get();
+                    }
+
+                    IntMap<CacheDataStore> cacheDataStores = new 
IntHashMap<>();
+
+                    for (CacheDataStore store : offheap.cacheDataStores()) {
+                        // Tree can be null for not yet initialized partitions.
+                        // This would mean that these partitions are empty.
+                        assert store.tree() == null || store.tree().groupId() 
== grpId;
+
+                        if (store.tree() != null)
+                            cacheDataStores.put(store.partId(), store);
+                    }
+
+                    //TODO ensure that there are no races.
+                    
dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion());
+
+                    // Another cheat. Ttl cleanup manager knows too much shit.
+                    oldGrpCtx.caches().stream()
+                        .filter(cacheCtx -> cacheCtx.groupId() == grpId)
+                        .forEach(cacheCtx -> cacheCtx.ttl().unregister());
+
+                    // Technically wal is already disabled, but 
"PageHandler.isWalDeltaRecordNeeded" doesn't care and
+                    // WAL records will be allocated anyway just to be ignored 
later if we don't disable WAL for
+                    // cache group explicitly.
+                    oldGrpCtx.localWalEnabled(false, false);
+
+                    boolean encrypted = 
oldGrpCtx.config().isEncryptionEnabled();
+
+                    FilePageStoreFactory pageStoreFactory = 
filePageStoreMgr.getPageStoreFactory(grpId, encrypted);
+
+                    createIndexPageStore(grpId, workDir, pageStoreFactory, 
partDataRegion, val -> {
+                    }); //TODO Allocated tracker.
+
+                    GridCompoundFuture<Object, Object> cmpFut = new 
GridCompoundFuture<>();
+
+                    PageMemoryEx oldPageMem = 
(PageMemoryEx)oldGrpCtx.dataRegion().pageMemory();
+
+                    CacheGroupContext newGrpCtx = new CacheGroupContext(
+                        sharedCtx,
+                        grpId,
+                        oldGrpCtx.receivedFrom(),
+                        CacheType.USER,
+                        oldGrpCtx.config(),
+                        oldGrpCtx.affinityNode(),
+                        partDataRegion,
+                        oldGrpCtx.cacheObjectContext(),
+                        null,
+                        null,
+                        oldGrpCtx.localStartVersion(),
+                        true,
+                        false,
+                        true
+                    );
+
+                    
defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+
+                    try {
+                        // This will initialize partition meta in index 
partition - meta tree and reuse list.
+                        newGrpCtx.start();
+                    }
+                    finally {
+                        
defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+                    }
+
+                    IntMap<LinkMap> linkMapByPart = new IntHashMap<>();
+
+                    for (CacheDataStore oldCacheDataStore : 
oldCacheDataStores) {
+                        int partId = oldCacheDataStore.partId();
+
+                        PartitionContext partCtx = new PartitionContext(
+                            workDir,
+                            grpId,
+                            partId,
+                            partDataRegion,
+                            mappingDataRegion,
+                            oldGrpCtx,
+                            newGrpCtx,
+                            cacheDataStores.get(partId),
+                            pageStoreFactory
+                        );
+
+                        if (skipAlreadyDefragmentedPartition(workDir, grpId, 
partId, log)) {
+                            partCtx.createPageStore(
+                                () -> defragmentedPartMappingFile(workDir, 
partId).toPath(),
+                                partCtx.mappingPagesAllocated,
+                                partCtx.mappingPageMemory
+                            );
+
+                            linkMapByPart.put(partId, 
partCtx.createLinkMapTree(false));
+
+                            continue;
+                        }
+
+                        partCtx.createPageStore(
+                            () -> defragmentedPartMappingFile(workDir, 
partId).toPath(),
+                            partCtx.mappingPagesAllocated,
+                            partCtx.mappingPageMemory
+                        );
+
+                        linkMapByPart.put(partId, 
partCtx.createLinkMapTree(true));
+
+                        partCtx.createPageStore(
+                            () -> defragmentedPartTmpFile(workDir, 
partId).toPath(),
+                            partCtx.partPagesAllocated,
+                            partCtx.partPageMemory
+                        );
+
+                        copyPartitionData(partCtx, treeIter, offheap);
+
+                        IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut 
-> {
+                            if (fut.error() != null)
+                                return;
+
+                            PageStore oldPageStore = null;
+
+                            try {
+                                oldPageStore = 
filePageStoreMgr.getStore(grpId, partId);
+                            }
+                            catch (IgniteCheckedException ignore) {
+                            }
+
+                            if (log.isDebugEnabled()) {
+                                log.debug(S.toString(
+                                    "Partition defragmented",
+                                    "grpId", grpId, false,
+                                    "partId", partId, false,
+                                    "oldPages", oldPageStore.pages(), false,
+                                    "newPages", 
partCtx.partPagesAllocated.get() + 1, false,
+                                    "mappingPages", 
partCtx.mappingPagesAllocated.get() + 1, false,
+                                    "pageSize", pageSize, false,
+                                    "partFile", defragmentedPartFile(workDir, 
partId).getName(), false,
+                                    "workDir", workDir, false
+                                ));
+                            }
+
+                            oldPageMem.invalidate(grpId, partId);
+
+                            partCtx.partPageMemory.invalidate(grpId, partId);
+
+                            DefragmentationPageReadWriteManager pageMgr = 
(DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();
+
+                            pageMgr.pageStoreMap().removePageStore(grpId, 
partId); // Yes, it'll be invalid in a second.
+
+                            renameTempPartitionFile(workDir, partId);
+                        };
+
+                        GridFutureAdapter<?> cpFut = defragmentationCheckpoint
+                            .forceCheckpoint("partition defragmented", null)
+                            .futureFor(CheckpointState.FINISHED);
+
+                        cpFut.listen(cpLsnr);
+
+                        cmpFut.add((IgniteInternalFuture<Object>)cpFut);
+                    }
+
+                    // A bit too general for now, but I like it more then 
saving only the last checkpoint future.
+                    cmpFut.markInitialized().get();
+
+                    idxDfrgFut = new GridFinishedFuture<>();
+
+                    if (filePageStoreMgr.hasIndexStore(grpId)) {
+                        defragmentIndexPartition(oldGrpCtx, newGrpCtx, 
linkMapByPart);
+
+                        idxDfrgFut = defragmentationCheckpoint
+                            .forceCheckpoint("index defragmented", null)
+                            .futureFor(CheckpointState.FINISHED);
+                    }
+
+                    idxDfrgFut.listen(fut -> {
+                        oldPageMem.invalidate(grpId, 
PageIdAllocator.INDEX_PARTITION);
+
+                        PageMemoryEx partPageMem = 
(PageMemoryEx)partDataRegion.pageMemory();
+
+                        partPageMem.invalidate(grpId, 
PageIdAllocator.INDEX_PARTITION);
+
+                        DefragmentationPageReadWriteManager pageMgr = 
(DefragmentationPageReadWriteManager)partPageMem.pageManager();
+
+                        pageMgr.pageStoreMap().removePageStore(grpId, 
PageIdAllocator.INDEX_PARTITION);
+
+                        PageMemoryEx mappingPageMem = 
(PageMemoryEx)mappingDataRegion.pageMemory();
+
+                        pageMgr = 
(DefragmentationPageReadWriteManager)mappingPageMem.pageManager();
+
+                        pageMgr.pageStoreMap().clear(grpId);
+
+                        renameTempIndexFile(workDir);
+
+                        
writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(),
 workDir, log);
+
+                        batchRenameDefragmentedCacheGroupPartitions(workDir, 
log);
+                    });
+                }
+
+                // I guess we should wait for it?
+                if (idxDfrgFut != null)
+                    idxDfrgFut.get();
+            }
+
+            mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME);
+
+            log.info("Defragmentation completed. All partitions are 
defragmented.");
+        }
+        finally {
+            defragmentationCheckpoint.stop(true);
+        }
+    }
+
+    /** */
+    public void createIndexPageStore(
+        int grpId,
+        File workDir,
+        FilePageStoreFactory pageStoreFactory,
+        DataRegion partRegion,
+        LongConsumer allocatedTracker
+    ) throws IgniteCheckedException {
+        // Index partition file has to be deleted before we begin, otherwise 
there's a chance of reading corrupted file.
+        // There is a time period when index is already defragmented but 
marker file is not created yet. If node is
+        // failed in that time window then index will be deframented once 
again. That's fine, situation is rare but code
+        // to fix that would add unnecessary complications.
+        U.delete(defragmentedIndexTmpFile(workDir));
+
+        PageStore idxPageStore;
+
+        defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock();
+        try {
+            idxPageStore = pageStoreFactory.createPageStore(
+                FLAG_IDX,
+                () -> defragmentedIndexTmpFile(workDir).toPath(),
+                allocatedTracker
+            );
+        }
+        finally {
+            
defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
+        }
+
+        idxPageStore.sync();
+
+        PageMemoryEx partPageMem = (PageMemoryEx)partRegion.pageMemory();
+
+        DefragmentationPageReadWriteManager partMgr = 
(DefragmentationPageReadWriteManager)partPageMem.pageManager();
+
+        partMgr.pageStoreMap().addPageStore(grpId, 
PageIdAllocator.INDEX_PARTITION, idxPageStore);
+    }
+
+    /** */
+    public enum PartStages {
+        START,
+        CP_LOCK,
+        ITERATE,
+        READ_ROW,
+        INSERT_ROW,
+        STORE_MAP,
+        STORE_PK,
+        STORE_PENDING,
+        METADATA
+    }
+
+    /**
+     * Defragmentate partition.
+     *
+     * @param partCtx
+     * @param treeIter
+     * @param offheap
+     * @throws IgniteCheckedException If failed.
+     */
+    private void copyPartitionData(
+        PartitionContext partCtx,
+        TreeIterator treeIter,
+        GridCacheOffheapManager offheap
+    ) throws IgniteCheckedException {
+        partCtx.createNewCacheDataStore(offheap);

Review comment:
       Let's move it outside of this method.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationPageReadWriteManager.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import org.apache.ignite.internal.GridKernalContext;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageReadWriteManagerImpl;
+
+/** */
+public class DefragmentationPageReadWriteManager extends 
PageReadWriteManagerImpl {

Review comment:
       Let's move everything here from `PageStoreMap` here? The 
DefragmentationPageReadWriteManager will also implement the 
`PageStoreCollection`.

##########
File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/defragmentation/IndexingDefragmentation.java
##########
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.defragmentation;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import 
org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointTimeoutLock;
+import 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.LinkMap;
+import 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TimeTracker;
+import 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMetaIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIoResolver;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.util.InsertLast;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
+import 
org.apache.ignite.internal.processors.query.h2.database.InlineIndexColumn;
+import 
org.apache.ignite.internal.processors.query.h2.database.inlinecolumn.AbstractInlineIndexColumn;
+import 
org.apache.ignite.internal.processors.query.h2.database.io.AbstractH2ExtrasInnerIO;
+import 
org.apache.ignite.internal.processors.query.h2.database.io.AbstractH2ExtrasLeafIO;
+import 
org.apache.ignite.internal.processors.query.h2.database.io.AbstractH2InnerIO;
+import 
org.apache.ignite.internal.processors.query.h2.database.io.AbstractH2LeafIO;
+import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
+import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.h2.index.Index;
+import org.h2.value.Value;
+
+import static 
org.apache.ignite.internal.processors.query.h2.defragmentation.IndexingDefragmentation.IndexStages.CP_LOCK;
+import static 
org.apache.ignite.internal.processors.query.h2.defragmentation.IndexingDefragmentation.IndexStages.INIT_TREE;
+import static 
org.apache.ignite.internal.processors.query.h2.defragmentation.IndexingDefragmentation.IndexStages.INSERT_ROW;
+import static 
org.apache.ignite.internal.processors.query.h2.defragmentation.IndexingDefragmentation.IndexStages.ITERATE;
+import static 
org.apache.ignite.internal.processors.query.h2.defragmentation.IndexingDefragmentation.IndexStages.READ_MAP;
+import static 
org.apache.ignite.internal.processors.query.h2.defragmentation.IndexingDefragmentation.IndexStages.READ_ROW;
+
+/**
+ *
+ */
+public class IndexingDefragmentation {
+    /** Indexing. */
+    private final IgniteH2Indexing indexing;
+
+    /** Constructor. */
+    public IndexingDefragmentation(IgniteH2Indexing indexing) {
+        this.indexing = indexing;
+    }
+
+    /** */
+    public enum IndexStages {
+        START,
+        CP_LOCK,
+        INIT_TREE,
+        ITERATE,
+        READ_ROW,
+        READ_MAP,
+        INSERT_ROW
+    }
+
+    /**
+     * Defragment index partition.
+     *
+     * @param grpCtx Old group context.
+     * @param newCtx New group context.
+     * @param partPageMem Partition page memory.
+     * @param mappingByPartition Mapping page memory.
+     * @param cpLock Defragmentation checkpoint read lock.
+     * @param log Log.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void defragment(
+        CacheGroupContext grpCtx,
+        CacheGroupContext newCtx,
+        PageMemoryEx partPageMem,
+        IntMap<LinkMap> mappingByPartition,
+        CheckpointTimeoutLock cpLock,
+        IgniteLogger log
+    ) throws IgniteCheckedException {
+        int pageSize = 
grpCtx.cacheObjectContext().kernalContext().grid().configuration().getDataStorageConfiguration().getPageSize();
+
+        TreeIterator treeIterator = new TreeIterator(pageSize);
+
+        PageMemoryEx oldCachePageMem = 
(PageMemoryEx)grpCtx.dataRegion().pageMemory();
+
+        PageMemory newCachePageMemory = partPageMem;
+
+        Collection<GridH2Table> tables = indexing.schemaManager().dataTables();
+
+        long cpLockThreshold = 150L;
+
+        TimeTracker<IndexStages> tracker = new 
TimeTracker<>(IndexStages.class);
+
+        cpLock.checkpointReadLock();
+        tracker.complete(CP_LOCK);
+
+        try {
+            AtomicLong lastCpLockTs = new 
AtomicLong(System.currentTimeMillis());
+
+            for (GridH2Table table : tables) {
+                GridCacheContext<?, ?> cctx = table.cacheContext();
+
+                if (cctx.groupId() != grpCtx.groupId())
+                    continue; // Not our index.
+
+                GridH2RowDescriptor rowDesc = table.rowDescriptor();
+
+                List<Index> indexes = table.getIndexes();
+                H2TreeIndex oldH2Idx = (H2TreeIndex)indexes.get(2);
+
+                int segments = oldH2Idx.segmentsCount();
+
+                H2Tree firstTree = oldH2Idx.treeForRead(0);
+
+                PageIoResolver pageIoRslvr = pageAddr -> {
+                    PageIO io = 
PageIoResolver.DEFAULT_PAGE_IO_RESOLVER.resolve(pageAddr);
+
+                    if (io instanceof BPlusMetaIO)
+                        return io;
+
+                    //noinspection unchecked,rawtypes,rawtypes
+                    return wrap((BPlusIO)io);
+                };
+
+                //TODO Create new proper GridCacheContext for it?

Review comment:
       Let's fix it.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
##########
@@ -487,4 +493,25 @@ default long indexSize(String schemaName, String tblName, 
String idxName) throws
     default Map<String, Integer> secondaryIndexesInlineSize() {
         return Collections.emptyMap();
     }
+
+    /**
+     * Defragment index partition.
+     *
+     * @param grpCtx Old group context.
+     * @param newCtx New group context.
+     * @param partPageMem Partition page memory.
+     * @param mappingByPart Mapping page memory.
+     * @param cpLock Defragmentation checkpoint read lock.
+     * @param log Log.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    void defragment(
+        CacheGroupContext grpCtx,
+        CacheGroupContext newCtx,
+        PageMemoryEx partPageMem,
+        IntMap<LinkMap> mappingByPart,
+        CheckpointTimeoutLock cpLock,

Review comment:
       `cpLock` and `log` should be part of API. You can obtain it from the 
`context` in implementation.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
##########
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.cache.persistence.defragmentation;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongConsumer;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.CacheType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager.GridCacheDataStore;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointTimeoutLock;
+import 
org.apache.ignite.internal.processors.cache.persistence.checkpoint.LightweightCheckpointManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
+import 
org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV3;
+import org.apache.ignite.internal.processors.cache.tree.AbstractDataLeafIO;
+import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
+import org.apache.ignite.internal.processors.cache.tree.DataRow;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.PendingRow;
+import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.collection.IntHashMap;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.maintenance.MaintenanceRegistry;
+
+import static java.util.stream.StreamSupport.stream;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_MAPPING_REGION_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_PART_REGION_NAME;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.CP_LOCK;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.INSERT_ROW;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.ITERATE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.METADATA;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.READ_ROW;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_MAP;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_PENDING;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.PartStages.STORE_PK;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.batchRenameDefragmentedCacheGroupPartitions;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedIndexTmpFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartMappingFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.defragmentedPartTmpFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.renameTempIndexFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.renameTempPartitionFile;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.skipAlreadyDefragmentedCacheGroup;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.skipAlreadyDefragmentedPartition;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationFileUtils.writeDefragmentationCompletionMarker;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.PageAccessType.ACCESS_READ;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.PageAccessType.ACCESS_WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.defragmentation.TreeIterator.access;
+
+/**
+ * Defragmentation manager is the core class that contains main 
defragmentation procedure.
+ */
+public class CachePartitionDefragmentationManager {
+    /** */
+    public static final String DEFRAGMENTATION_MNTC_TASK_NAME = 
"defragmentationMaintenanceTask";
+
+    /** */
+    private final Set<Integer> cacheGroupsForDefragmentation;
+
+    /** Cache shared context. */
+    private final GridCacheSharedContext<?, ?> sharedCtx;
+
+    /** Maintenance registry. */
+    private final MaintenanceRegistry mntcReg;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Database shared manager. */
+    private final GridCacheDatabaseSharedManager dbMgr;
+
+    /** File page store manager. */
+    private final FilePageStoreManager filePageStoreMgr;
+
+    /**
+     * Checkpoint for specific defragmentation regions which would store the 
data to new partitions
+     * during the defragmentation.
+     */
+    private final LightweightCheckpointManager defragmentationCheckpoint;
+
+    /** Default checkpoint for current node. */
+    private final CheckpointManager nodeCheckpoint;
+
+    /** Page size. */
+    private final int pageSize;
+
+    /** */
+    private final DataRegion partDataRegion;
+
+    /** */
+    private final DataRegion mappingDataRegion;
+
+    /**
+     * @param cacheGrpIds
+     * @param sharedCtx Cache shared context.
+     * @param dbMgr Database manager.
+     * @param filePageStoreMgr File page store manager.
+     * @param nodeCheckpoint Default checkpoint for this node.
+     * @param defragmentationCheckpoint Specific checkpoint for 
defragmentation.
+     * @param pageSize Page size.
+     */
+    public CachePartitionDefragmentationManager(
+        List<Integer> cacheGrpIds,
+        GridCacheSharedContext<?, ?> sharedCtx,
+        GridCacheDatabaseSharedManager dbMgr,
+        FilePageStoreManager filePageStoreMgr,
+        CheckpointManager nodeCheckpoint,
+        LightweightCheckpointManager defragmentationCheckpoint,
+        int pageSize
+    ) throws IgniteCheckedException {
+        cacheGroupsForDefragmentation = new HashSet<>(cacheGrpIds);
+
+        this.dbMgr = dbMgr;
+        this.filePageStoreMgr = filePageStoreMgr;
+        this.pageSize = pageSize;
+        this.sharedCtx = sharedCtx;
+
+        this.mntcReg = sharedCtx.kernalContext().maintenanceRegistry();
+        this.log = sharedCtx.logger(getClass());
+        this.defragmentationCheckpoint = defragmentationCheckpoint;
+        this.nodeCheckpoint = nodeCheckpoint;
+
+        partDataRegion = dbMgr.dataRegion(DEFRAGMENTATION_PART_REGION_NAME);
+        mappingDataRegion = 
dbMgr.dataRegion(DEFRAGMENTATION_MAPPING_REGION_NAME);
+    }
+
+    /** */
+    public void executeDefragmentation() throws IgniteCheckedException {
+        log.info("Defragmentation started.");
+
+        try {
+            // Checkpointer must be enabled so all pages on disk are in their 
latest valid state.
+            dbMgr.resumeWalLogging();
+
+            dbMgr.onStateRestored(null);
+
+            nodeCheckpoint.forceCheckpoint("beforeDefragmentation", 
null).futureFor(FINISHED).get();
+
+            sharedCtx.wal().onDeActivate(sharedCtx.kernalContext());
+
+            // Now the actual process starts.
+            TreeIterator treeIter = new TreeIterator(pageSize);
+
+            IgniteInternalFuture<?> idxDfrgFut = null;
+            DataPageEvictionMode prevPageEvictionMode = null;
+
+            for (CacheGroupContext oldGrpCtx : 
sharedCtx.cache().cacheGroups()) {
+                if (!oldGrpCtx.userCache())
+                    continue;
+
+                int grpId = oldGrpCtx.groupId();
+
+                if (!cacheGroupsForDefragmentation.isEmpty() && 
!cacheGroupsForDefragmentation.contains(grpId))
+                    continue;
+
+                File workDir = 
filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), 
oldGrpCtx.cacheOrGroupName());
+
+                if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log))
+                    continue;
+
+                GridCacheOffheapManager offheap = 
(GridCacheOffheapManager)oldGrpCtx.offheap();
+
+                List<CacheDataStore> oldCacheDataStores = 
stream(offheap.cacheDataStores().spliterator(), false)
+                    .filter(store -> {
+                        try {
+                            return filePageStoreMgr.exists(grpId, 
store.partId());
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
+                    })
+                    .collect(Collectors.toList());
+
+                if (workDir != null && !oldCacheDataStores.isEmpty()) {
+                    // We can't start defragmentation of new group on the 
region that has wrong eviction mode.
+                    // So waiting of the previous cache group defragmentation 
is inevitable.
+                    DataPageEvictionMode curPageEvictionMode = 
oldGrpCtx.dataRegion().config().getPageEvictionMode();
+
+                    if (prevPageEvictionMode == null || prevPageEvictionMode 
!= curPageEvictionMode) {
+                        prevPageEvictionMode = curPageEvictionMode;
+
+                        
partDataRegion.config().setPageEvictionMode(curPageEvictionMode);
+
+                        if (idxDfrgFut != null)
+                            idxDfrgFut.get();
+                    }
+
+                    IntMap<CacheDataStore> cacheDataStores = new 
IntHashMap<>();
+
+                    for (CacheDataStore store : offheap.cacheDataStores()) {
+                        // Tree can be null for not yet initialized partitions.
+                        // This would mean that these partitions are empty.
+                        assert store.tree() == null || store.tree().groupId() 
== grpId;
+
+                        if (store.tree() != null)
+                            cacheDataStores.put(store.partId(), store);
+                    }
+
+                    //TODO ensure that there are no races.
+                    
dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion());
+
+                    // Another cheat. Ttl cleanup manager knows too much shit.
+                    oldGrpCtx.caches().stream()
+                        .filter(cacheCtx -> cacheCtx.groupId() == grpId)
+                        .forEach(cacheCtx -> cacheCtx.ttl().unregister());
+
+                    // Technically wal is already disabled, but 
"PageHandler.isWalDeltaRecordNeeded" doesn't care and
+                    // WAL records will be allocated anyway just to be ignored 
later if we don't disable WAL for
+                    // cache group explicitly.
+                    oldGrpCtx.localWalEnabled(false, false);
+
+                    boolean encrypted = 
oldGrpCtx.config().isEncryptionEnabled();
+
+                    FilePageStoreFactory pageStoreFactory = 
filePageStoreMgr.getPageStoreFactory(grpId, encrypted);
+
+                    createIndexPageStore(grpId, workDir, pageStoreFactory, 
partDataRegion, val -> {
+                    }); //TODO Allocated tracker.

Review comment:
       Let's fix this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to