ibessonov commented on code in PR #1325:
URL: https://github.com/apache/ignite-3/pull/1325#discussion_r1029043178


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounter.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Helper class for tracking the completion of partition processing.
+ *
+ * <p>At the start of partition processing, you need to call {@link 
#onStartPartitionProcessing()}, at the end
+ * {@link #onFinishPartitionProcessing()}. When all partition processing is 
completed, the {@link #future()} will be completed.
+ *
+ * <p>It is recommended to use external synchronization for the correct 
operation of the {@link #counter partition processing counter} and
+ * the {@link #future future}.
+ */
+public class PartitionProcessingCounter {
+    private static final VarHandle COUNTER;
+
+    static {
+        try {
+            COUNTER = 
MethodHandles.lookup().findVarHandle(PartitionProcessingCounter.class, 
"counter", int.class);
+        } catch (ReflectiveOperationException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
+
+    /** Partition processing counter must be greater than or equal to zero. */
+    @SuppressWarnings("unused")
+    private volatile int counter;
+
+    /** Future that will be completed when the {@link #counter} is zero. */
+    private final CompletableFuture<Void> future = new CompletableFuture<>();
+
+    /**
+     * Callback at the start of partition processing.

Review Comment:
   Technically, it's not a callback. It's just a method that one should call



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java:
##########
@@ -106,7 +104,7 @@ CheckpointPagesWriter build(
             BooleanSupplier shutdownNow
     ) {
         return new CheckpointPagesWriter(
-                log,
+                LOG,

Review Comment:
   Should the writer also have its own logger?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java:
##########
@@ -307,11 +312,23 @@ static int[] 
pageIndexesForDeltaFilePageStore(CheckpointDirtyPagesView partition
     }
 
     /**
-     * Adds the number of delta files to compact.
+     * Callback on adding delta files so we can start compacting them.
+     */
+    public void onAddingDeltaFiles() {

Review Comment:
   I would prefer another name, but I don't think that I have a good example. 
"on***" methods are good in listeners, but this call must be explicit. So I 
expect an explicit action in the name, like, "triggerCompaction" or something



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -452,9 +456,21 @@ private void syncUpdatedPageStores(
                     return;
                 }
 
-                fsyncDeltaFilePageStoreOnCheckpointThread(entry.getKey(), 
entry.getValue());
+                GroupPartitionId partitionId = entry.getKey();
+
+                FilePageStore filePageStore = 
filePageStoreManager.getStore(partitionId.getGroupId(), 
partitionId.getPartitionId());
+
+                if (filePageStore == null || 
filePageStore.isMarkedToDestroy()) {
+                    continue;
+                }
+
+                
currentCheckpointProgress.onStartPartitionProcessing(partitionId.getGroupId(), 
partitionId.getPartitionId());
+
+                fsyncDeltaFilePageStoreOnCheckpointThread(filePageStore, 
entry.getValue());
 
-                renameDeltaFileOnCheckpointThread(entry.getKey());
+                renameDeltaFileOnCheckpointThread(filePageStore, partitionId);
+
+                
currentCheckpointProgress.onFinishPartitionProcessing(partitionId.getGroupId(), 
partitionId.getPartitionId());

Review Comment:
   Is there a chance of having a future that will never be completed if 
something goes wrong? I don't see a try/finally section here. I don't see it in 
other places, that use these methods, either. There may be bugs lurking in you 
code because of that



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -164,77 +170,90 @@ void waitDeltaFiles() {
     }
 
     /**
-     * Adds the number of delta files to compact.
-     *
-     * @param count Number of delta files.
+     * Callback on adding delta files so we can start compacting them.
      */
-    public void addDeltaFiles(int count) {
-        assert count >= 0;
-
-        if (count > 0) {
-            deltaFileCount.addAndGet(count);
+    public void onAddingDeltaFiles() {
+        synchronized (mux) {
+            addedDeltaFiles = true;
 
-            synchronized (mux) {
-                mux.notifyAll();
-            }
+            mux.notifyAll();
         }
     }
 
     /**
      * Merges delta files with partition files.
      */
     void doCompaction() {
-        // Let's collect one delta file for each partition.
-        Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue = 
filePageStoreManager.allPageStores().stream()
-                .flatMap(List::stream)
-                .map(filePageStore -> {
+        while (true) {

Review Comment:
   Please explain this loop. It's not obvious



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -338,6 +360,10 @@ void mergeDeltaFileToMainFile(
                 return;
             }
 
+            if (filePageStore.isMarkedToDestroy()) {

Review Comment:
   Why do we double-check it?
   Is there a synchronization? What would happen if you removed this particular 
check and store is marked for destruction?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java:
##########
@@ -139,31 +149,63 @@ public void start() throws IgniteInternalCheckedException 
{
             throw new IgniteInternalCheckedException("Could not create work 
directory for page stores: " + dbDir, e);
         }
 
-        if (log.isWarnEnabled()) {
+        if (LOG.isWarnEnabled()) {
             String tmpDir = System.getProperty("java.io.tmpdir");
 
             if (tmpDir != null && this.dbDir.startsWith(tmpDir)) {
-                log.warn("Persistence store directory is in the temp directory 
and may be cleaned. "
+                LOG.warn("Persistence store directory is in the temp directory 
and may be cleaned. "
                         + "To avoid this change location of persistence 
directories [currentDir={}]", this.dbDir);
             }
         }
 
+        List<Path> toDelete = new ArrayList<>();
+
         try (Stream<Path> tmpFileStream = Files.find(
                 dbDir,
                 Integer.MAX_VALUE,
-                (path, basicFileAttributes) -> 
path.getFileName().toString().endsWith(TMP_FILE_SUFFIX)
-        )) {
-            List<Path> tmpFiles = tmpFileStream.collect(toList());
+                (path, basicFileAttributes) -> 
path.getFileName().toString().endsWith(TMP_FILE_SUFFIX))
+        ) {
+            toDelete.addAll(tmpFileStream.collect(toList()));
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Error while searching 
temporary files:" + dbDir, e);
+        }
+
+        Pattern delPartitionFilePatter = Pattern.compile(DEL_PART_FILE_REGEXP);
+
+        try (Stream<Path> delFileStream = Files.find(
+                dbDir,
+                Integer.MAX_VALUE,
+                (path, basicFileAttributes) -> 
path.getFileName().toString().endsWith(DEL_FILE_SUFFIX))
+        ) {
+            delFileStream.forEach(delFilePath -> {
+                Matcher matcher = 
delPartitionFilePatter.matcher(delFilePath.getFileName().toString());
 
-            if (!tmpFiles.isEmpty()) {
-                if (log.isInfoEnabled()) {
-                    log.info("Temporary files to be deleted: {}", 
tmpFiles.size());
+                if (!matcher.matches()) {
+                    throw new IgniteInternalException("Unknown file: " + 
delFilePath);
                 }
 
-                tmpFiles.forEach(IgniteUtils::deleteIfExists);
-            }
+                Path tableWorkDir = delFilePath.getParent();
+
+                int partitionId = Integer.parseInt(matcher.group(1));
+
+                
toDelete.add(tableWorkDir.resolve(String.format(PART_FILE_TEMPLATE, 
partitionId)));
+
+                try {
+                    
toDelete.addAll(List.of(findPartitionDeltaFiles(tableWorkDir, partitionId)));
+                } catch (IgniteInternalCheckedException e) {
+                    throw new IgniteInternalException("Error when searching 
delta files for partition:" + delFilePath, e);
+                }
+
+                toDelete.add(delFilePath);
+            });
         } catch (IOException e) {
-            throw new IgniteInternalCheckedException("Could not create work 
directory for page stores: " + dbDir, e);
+            throw new IgniteInternalCheckedException("Error while searching 
temporary files:" + dbDir, e);
+        }
+
+        if (!toDelete.isEmpty()) {
+            LOG.info("Files to be deleted: {}", toDelete);

Review Comment:
   Please add a line with reasons to delete these files. Something generic, 
like "Cleaning obsolete files for table x"



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java:
##########
@@ -443,4 +461,50 @@ public Path tmpDeltaFilePageStorePath(int groupId, int 
partitionId, int index) {
     public Path deltaFilePageStorePath(int groupId, int partitionId, int 
index) {
         return dbDir.resolve(GROUP_DIR_PREFIX + 
groupId).resolve(String.format(PART_DELTA_FILE_TEMPLATE, partitionId, index));
     }
+
+    /**
+     * Callback on destruction of the partition of the corresponding group.
+     *
+     * <p>Deletes the partition pages tore and all its delta files. Before 
that, it creates a marker file (for example,

Review Comment:
   ```suggestion
        * <p>Deletes the partition pages store and all its delta files. Before 
that, it creates a marker file (for example,
   ```



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/GroupPageStoresMap.java:
##########
@@ -47,52 +50,156 @@ public GroupPageStoresMap(LongOperationAsyncExecutor 
longOperationAsyncExecutor)
     }
 
     /**
-     * Puts the page stores for the group.
+     * Puts the page store for the group partition.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     * @param pageStore Page store.
+     * @return Previous page store.
+     */
+    public @Nullable T put(Integer groupId, Integer partitionId, T pageStore) {
+        return longOperationAsyncExecutor.afterAsyncCompletion(() -> {
+                    PartitionPageStore<T> previous = groupIdPageStores
+                            .computeIfAbsent(groupId, id -> new 
GroupPageStores<>(groupId))
+                            .partitionIdPageStore
+                            .put(partitionId, new 
PartitionPageStore<>(partitionId, pageStore));
+
+                    return previous == null ? null : previous.pageStore;
+                }
+        );
+    }
+
+    /**
+     * Removes the page store for the group partition.
      *
-     * @param grpId Group ID.
-     * @param pageStores Page stores.
-     * @return Previous page stores.
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     * @return Removed page store.
      */
-    public @Nullable List<T> put(Integer grpId, List<T> pageStores) {
-        return longOperationAsyncExecutor.afterAsyncCompletion(() -> 
groupPageStores.put(grpId, pageStores));
+    public @Nullable T remove(Integer groupId, Integer partitionId) {
+        AtomicReference<PartitionPageStore<T>> partitionPageStoreRef = new 
AtomicReference<>();
+
+        groupIdPageStores.compute(groupId, (id, groupPageStores) -> {
+            if (groupPageStores == null) {
+                return null;
+            }
+
+            
partitionPageStoreRef.set(groupPageStores.partitionIdPageStore.remove(partitionId));
+
+            if (groupPageStores.partitionIdPageStore.isEmpty()) {
+                return null;
+            }
+
+            return groupPageStores;
+        });
+
+        PartitionPageStore<T> partitionPageStore = partitionPageStoreRef.get();
+
+        return partitionPageStore == null ? null : 
partitionPageStore.pageStore;
     }
 
     /**
      * Returns the page stores for the group.
      *
-     * @param grpId Group ID.
+     * @param groupId Group ID.
      */
-    public @Nullable List<T> get(Integer grpId) {
-        return groupPageStores.get(grpId);
+    public @Nullable GroupPageStores<T> get(Integer groupId) {
+        return groupIdPageStores.get(groupId);
     }
 
     /**
-     * Returns {@code true} if a page stores exists for the group.
+     * Returns {@code true} if a page store exists for the group partition.
      *
-     * @param grpId Group ID.
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
      */
-    public boolean containsPageStores(Integer grpId) {
-        return groupPageStores.containsKey(grpId);
+    public boolean contains(Integer groupId, Integer partitionId) {
+        GroupPageStores<T> groupPageStores = groupIdPageStores.get(groupId);
+
+        return groupPageStores != null && 
groupPageStores.partitionIdPageStore.containsKey(partitionId);
     }
 
     /**
-     * Returns all page stores of all groups.
+     * Returns a view of all page stores of all groups.
      */
-    public Collection<List<T>> allPageStores() {
-        return groupPageStores.values();
+    public Collection<GroupPageStores<T>> getAll() {
+        return unmodifiableCollection(groupIdPageStores.values());
     }
 
     /**
      * Clears all page stores of all groups.
      */
     public void clear() {
-        groupPageStores.clear();
+        groupIdPageStores.clear();
     }
 
     /**
      * Returns the number of groups for which there are page stores.
      */
     public int groupCount() {
-        return groupPageStores.size();
+        return groupIdPageStores.size();
+    }
+
+    /**
+     * Group partition page stores.
+     */
+    public static class GroupPageStores<T extends PageStore> {
+        private final int groupId;
+
+        private final ConcurrentMap<Integer, PartitionPageStore<T>> 
partitionIdPageStore = new ConcurrentHashMap<>();
+
+        private GroupPageStores(int groupId) {
+            this.groupId = groupId;
+        }
+
+        /**
+         * Returns the group ID.
+         */
+        public int groupId() {
+            return groupId;
+        }
+
+        /**
+         * Returns the partition's page store.
+         */
+        @Nullable
+        public PartitionPageStore<T> get(Integer partitionId) {
+            return partitionIdPageStore.get(partitionId);
+        }
+
+        /**
+         * Returns a view of the group's partition page stores.
+         */
+        public Collection<PartitionPageStore<T>> getAll() {
+            return unmodifiableCollection(partitionIdPageStore.values());
+        }
+    }
+
+    /**
+     * Partition page store.
+     */
+    public static class PartitionPageStore<T extends PageStore> {
+        private final int partitionId;
+
+        private final T pageStore;

Review Comment:
   Does PagesStore have a getter for partitionId? Maybe it should, please 
validate this idea



##########
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java:
##########
@@ -86,41 +97,29 @@ public PersistentPageMemoryDataRegion dataRegion() {
         return dataRegion;
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean isVolatile() {
         return false;
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void start() throws StorageException {
-        super.start();
-
-        TableView tableView = tableCfg.value();
-
-        try {
-            dataRegion.filePageStoreManager().initialize(tableView.name(), 
tableView.tableId(), tableView.partitions());
-
-            int deltaFileCount = 
dataRegion.filePageStoreManager().getStores(tableView.tableId()).stream()
-                    .mapToInt(FilePageStore::deltaFileCount)
-                    .sum();
-
-            
dataRegion.checkpointManager().addDeltaFileCountForCompaction(deltaFileCount);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Error initializing file page stores 
for table: " + tableView.name(), e);
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override
     public void destroy() throws StorageException {
         close(true);
     }
 
-    /** {@inheritDoc} */
     @Override
     public PersistentPageMemoryMvPartitionStorage createMvPartitionStorage(int 
partitionId) {
+        CompletableFuture<Void> partitionDestroyFuture = 
destroyFutureByPartitionId.get(partitionId);
+
+        if (partitionDestroyFuture != null) {
+            try {
+                // Time is chosen randomly (long enough) so as not to call 
#join().
+                partitionDestroyFuture.get(10, TimeUnit.SECONDS);

Review Comment:
   Again, "randomly" and "long enough" - I don't like it. This particular 
method is trickier, we should think about what to do here and how to design 
things.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -164,77 +170,90 @@ void waitDeltaFiles() {
     }
 
     /**
-     * Adds the number of delta files to compact.
-     *
-     * @param count Number of delta files.
+     * Callback on adding delta files so we can start compacting them.
      */
-    public void addDeltaFiles(int count) {
-        assert count >= 0;
-
-        if (count > 0) {
-            deltaFileCount.addAndGet(count);
+    public void onAddingDeltaFiles() {
+        synchronized (mux) {
+            addedDeltaFiles = true;
 
-            synchronized (mux) {
-                mux.notifyAll();
-            }
+            mux.notifyAll();
         }
     }
 
     /**
      * Merges delta files with partition files.
      */
     void doCompaction() {

Review Comment:
   Who calls this method? Pretty small description for a method this big. I see 
that it waits for some futures at the end. This means that it blocks a caller 
thread. Is this necessary? Please describe what's going on, code itself is hard 
to understand.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -775,4 +799,41 @@ private void 
renameDeltaFileOnCheckpointThread(GroupPartitionId partitionId) thr
     void updateLastProgressAfterReleaseWriteLock() {
         afterReleaseWriteLockCheckpointProgress = currentCheckpointProgress;
     }
+
+    /**
+     * Callback on destruction of the partition of the corresponding group.
+     *
+     * <p>If the checkpoint is in progress, then wait until it finishes 
processing the partition that we are going to destroy, in order to
+     * prevent the situation when we want to destroy the partition file along 
with its delta files, and at this time the checkpoint performs
+     * I/O operations on them.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     * @throws IgniteInternalCheckedException If there are errors while 
processing the callback.
+     */
+    void onPartitionDestruction(int groupId, int partitionId) throws 
IgniteInternalCheckedException {
+        CheckpointProgressImpl currentCheckpointProgress = 
this.currentCheckpointProgress;
+
+        if (currentCheckpointProgress == null || 
!currentCheckpointProgress.inProgress()) {
+            return;
+        }
+
+        CompletableFuture<Void> processedPartitionFuture = 
currentCheckpointProgress.getProcessedPartitionFuture(groupId, partitionId);
+
+        if (processedPartitionFuture != null) {
+            try {
+                // Time is taken arbitrarily, but long enough to allow time 
for the future to complete.
+                processedPartitionFuture.get(10, SECONDS);

Review Comment:
   Wait, what?
   First of all, please don't hard-code such constants.
   Second, we're constantly suffer from other people using "get" or "join" on 
futures, saying that it's a horrible practice. And now I see this, without 
proper explanation of why it needs to be here.
   So, either explain thoroughly, or even better, re-write this code to avoid 
explicit await.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounter.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Helper class for tracking the completion of partition processing.
+ *
+ * <p>At the start of partition processing, you need to call {@link 
#onStartPartitionProcessing()}, at the end
+ * {@link #onFinishPartitionProcessing()}. When all partition processing is 
completed, the {@link #future()} will be completed.
+ *
+ * <p>It is recommended to use external synchronization for the correct 
operation of the {@link #counter partition processing counter} and
+ * the {@link #future future}.
+ */
+public class PartitionProcessingCounter {

Review Comment:
   Should this class be package-private?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -65,13 +66,17 @@ public class Compactor extends IgniteWorker {
 
     private final @Nullable ThreadPoolExecutor threadPoolExecutor;
 
-    private final AtomicInteger deltaFileCount = new AtomicInteger();
+    /** Guarded by {@link #mux}. */
+    private boolean addedDeltaFiles;
 
     private final FilePageStoreManager filePageStoreManager;
 
     /** Thread local with buffers for the compaction threads. */
     private final ThreadLocal<ByteBuffer> threadBuf;

Review Comment:
   By the way, why not "static". Is it possible that several compactor workers 
operate in a single thread? I don't think so



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java:
##########
@@ -307,11 +312,23 @@ static int[] 
pageIndexesForDeltaFilePageStore(CheckpointDirtyPagesView partition
     }
 
     /**
-     * Adds the number of delta files to compact.
+     * Callback on adding delta files so we can start compacting them.
+     */
+    public void onAddingDeltaFiles() {
+        compactor.onAddingDeltaFiles();
+    }
+
+    /**
+     * Callback on destruction of the partition of the corresponding group.
+     *
+     * <p>Prepares the checkpointer and compactor for partition destruction.
      *
-     * @param count Number of delta files.
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     * @throws IgniteInternalCheckedException If there are errors while 
processing the callback.
      */
-    public void addDeltaFileCountForCompaction(int count) {
-        compactor.addDeltaFiles(count);
+    public void onPartitionDestruction(int groupId, int partitionId) throws 
IgniteInternalCheckedException {

Review Comment:
   This one doesn't return any future. I guess we get them differently?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -164,77 +170,90 @@ void waitDeltaFiles() {
     }
 
     /**
-     * Adds the number of delta files to compact.
-     *
-     * @param count Number of delta files.
+     * Callback on adding delta files so we can start compacting them.
      */
-    public void addDeltaFiles(int count) {
-        assert count >= 0;
-
-        if (count > 0) {
-            deltaFileCount.addAndGet(count);
+    public void onAddingDeltaFiles() {
+        synchronized (mux) {
+            addedDeltaFiles = true;
 
-            synchronized (mux) {
-                mux.notifyAll();
-            }
+            mux.notifyAll();
         }
     }
 
     /**
      * Merges delta files with partition files.
      */
     void doCompaction() {
-        // Let's collect one delta file for each partition.
-        Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue = 
filePageStoreManager.allPageStores().stream()
-                .flatMap(List::stream)
-                .map(filePageStore -> {
+        while (true) {
+            // Let's collect one delta file for each partition.
+            ConcurrentLinkedQueue<DeltaFileToCompaction> queue = new 
ConcurrentLinkedQueue<>();
+
+            for (GroupPageStores<FilePageStore> groupPageStores : 
filePageStoreManager.allPageStores()) {
+                for (PartitionPageStore<FilePageStore> partitionPageStore : 
groupPageStores.getAll()) {
+                    FilePageStore filePageStore = 
partitionPageStore.pageStore();
+
                     DeltaFilePageStoreIo deltaFileToCompaction = 
filePageStore.getDeltaFileToCompaction();
 
-                    return deltaFileToCompaction == null ? null : new 
IgniteBiTuple<>(filePageStore, deltaFileToCompaction);
-                })
-                .filter(Objects::nonNull)
-                .collect(toCollection(ConcurrentLinkedQueue::new));
+                    if (!filePageStore.isMarkedToDestroy() && 
deltaFileToCompaction != null) {
+                        queue.add(new DeltaFileToCompaction(
+                                groupPageStores.groupId(),
+                                partitionPageStore.partitionId(),
+                                filePageStore,
+                                deltaFileToCompaction
+                        ));
+                    }
+                }
+            }
+
+            if (queue.isEmpty()) {

Review Comment:
   So, with this queue you're supervising the thread-pool, right?
   You read a batch of delta files, distribute them across the pool, and then 
read the next batch and so on.
   If what I described is correct, you're under-utilizing resources. All 
threads must wait until the last thread finishes its delta file, and only then 
the next batch is created.
   Why have you decided to use this approach? Was this all intentional? Did you 
realize that threads are under-utilized in this implementation?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -146,7 +150,7 @@ protected void body() throws InterruptedException {
     void waitDeltaFiles() {

Review Comment:
   How many threads execute this method? If more than one, then I see a huge 
problem.
   First thread to wake up will set "addedDeltaFiles" to false. Other threads 
will see that and continue the loop indefinitely (or until cancellation, to be 
precise).
   Basically, with this change you ruined multi-threading in compactor, please 
be careful.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -164,77 +170,90 @@ void waitDeltaFiles() {
     }
 
     /**
-     * Adds the number of delta files to compact.
-     *
-     * @param count Number of delta files.
+     * Callback on adding delta files so we can start compacting them.
      */
-    public void addDeltaFiles(int count) {
-        assert count >= 0;
-
-        if (count > 0) {
-            deltaFileCount.addAndGet(count);
+    public void onAddingDeltaFiles() {
+        synchronized (mux) {
+            addedDeltaFiles = true;
 
-            synchronized (mux) {
-                mux.notifyAll();
-            }
+            mux.notifyAll();
         }
     }
 
     /**
      * Merges delta files with partition files.
      */
     void doCompaction() {
-        // Let's collect one delta file for each partition.
-        Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue = 
filePageStoreManager.allPageStores().stream()
-                .flatMap(List::stream)
-                .map(filePageStore -> {
+        while (true) {
+            // Let's collect one delta file for each partition.
+            ConcurrentLinkedQueue<DeltaFileToCompaction> queue = new 
ConcurrentLinkedQueue<>();
+
+            for (GroupPageStores<FilePageStore> groupPageStores : 
filePageStoreManager.allPageStores()) {
+                for (PartitionPageStore<FilePageStore> partitionPageStore : 
groupPageStores.getAll()) {
+                    FilePageStore filePageStore = 
partitionPageStore.pageStore();
+
                     DeltaFilePageStoreIo deltaFileToCompaction = 
filePageStore.getDeltaFileToCompaction();
 
-                    return deltaFileToCompaction == null ? null : new 
IgniteBiTuple<>(filePageStore, deltaFileToCompaction);
-                })
-                .filter(Objects::nonNull)
-                .collect(toCollection(ConcurrentLinkedQueue::new));
+                    if (!filePageStore.isMarkedToDestroy() && 
deltaFileToCompaction != null) {
+                        queue.add(new DeltaFileToCompaction(
+                                groupPageStores.groupId(),
+                                partitionPageStore.partitionId(),
+                                filePageStore,
+                                deltaFileToCompaction
+                        ));
+                    }
+                }
+            }
+
+            if (queue.isEmpty()) {
+                break;
+            }
+
+            updateHeartbeat();
 
-        assert !queue.isEmpty();
+            int threads = threadPoolExecutor == null ? 1 : 
threadPoolExecutor.getMaximumPoolSize();

Review Comment:
   How can it be null? Is it possible to set it up in this way?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -357,14 +387,70 @@ void mergeDeltaFileToMainFile(
             return;
         }
 
+        if (filePageStore.isMarkedToDestroy()) {
+            return;
+        }
+
         boolean removed = filePageStore.removeDeltaFile(deltaFilePageStore);
 
         assert removed : filePageStore.filePath();
 
         deltaFilePageStore.markMergedToFilePageStore();
 
         deltaFilePageStore.stop(true);
+    }
+
+    /**
+     * Callback on destruction of the partition of the corresponding group.
+     *
+     * <p>If the partition compaction is in progress, then we will wait until 
it is completed so that there are no errors when we want to
+     * destroy the partition file and its delta file, and at this time its 
compaction occurs.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     */
+    public void onPartitionDestruction(int groupId, int partitionId) throws 
IgniteInternalCheckedException {
+        CompletableFuture<Void> partitionProcessingFuture = 
processedPartitionMap.getProcessedPartitionFuture(groupId, partitionId);
+
+        if (partitionProcessingFuture != null) {
+            try {
+                // Time is taken arbitrarily, but long enough to allow time 
for the future to complete.
+                partitionProcessingFuture.get(10, SECONDS);

Review Comment:
   Again, I don't like this code. I very-very don't like it.
   For some clients, checkpoints may take, like, a minute. So 10 seconds isn't 
enough.
   And, even if it is enough, having all these "get" calls is a bad practice.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -357,14 +387,70 @@ void mergeDeltaFileToMainFile(
             return;
         }
 
+        if (filePageStore.isMarkedToDestroy()) {
+            return;
+        }
+
         boolean removed = filePageStore.removeDeltaFile(deltaFilePageStore);
 
         assert removed : filePageStore.filePath();
 
         deltaFilePageStore.markMergedToFilePageStore();
 
         deltaFilePageStore.stop(true);
+    }
+
+    /**
+     * Callback on destruction of the partition of the corresponding group.
+     *
+     * <p>If the partition compaction is in progress, then we will wait until 
it is completed so that there are no errors when we want to
+     * destroy the partition file and its delta file, and at this time its 
compaction occurs.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     */
+    public void onPartitionDestruction(int groupId, int partitionId) throws 
IgniteInternalCheckedException {
+        CompletableFuture<Void> partitionProcessingFuture = 
processedPartitionMap.getProcessedPartitionFuture(groupId, partitionId);
+
+        if (partitionProcessingFuture != null) {
+            try {
+                // Time is taken arbitrarily, but long enough to allow time 
for the future to complete.
+                partitionProcessingFuture.get(10, SECONDS);
+            } catch (Exception e) {
+                throw new IgniteInternalCheckedException(
+                        IgniteStringFormatter.format(
+                                "Error waiting for partition processing to 
complete on compaction: [groupId={}, partitionId={}]",
+                                groupId,
+                                partitionId
+                        ),
+                        e
+                );
+            }
+        }
+    }
+
+    /**
+     * Delta file for compaction.
+     */
+    private static class DeltaFileToCompaction {

Review Comment:
   It's "for compaction", or "to compact", there's no other option



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java:
##########
@@ -320,9 +369,9 @@ void stopAllGroupFilePageStores(boolean cleanFiles) {
             try {
                 stopGroupFilePageStores(partitionPageStores, cleanFiles);
 
-                log.info("Cleanup cache stores [total={}, cleanFiles={}]", 
partitionPageStores.size(), cleanFiles);
+                LOG.info("Cleanup cache stores [total={}, cleanFiles={}]", 
partitionPageStores.size(), cleanFiles);
             } catch (Exception e) {
-                log.info("Failed to gracefully stop page store managers", e);
+                LOG.info("Failed to gracefully stop page store managers", e);

Review Comment:
   Are you sure this is "info" level event?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/GroupPageStoresMap.java:
##########
@@ -47,52 +50,156 @@ public GroupPageStoresMap(LongOperationAsyncExecutor 
longOperationAsyncExecutor)
     }
 
     /**
-     * Puts the page stores for the group.
+     * Puts the page store for the group partition.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     * @param pageStore Page store.
+     * @return Previous page store.
+     */
+    public @Nullable T put(Integer groupId, Integer partitionId, T pageStore) {
+        return longOperationAsyncExecutor.afterAsyncCompletion(() -> {
+                    PartitionPageStore<T> previous = groupIdPageStores
+                            .computeIfAbsent(groupId, id -> new 
GroupPageStores<>(groupId))
+                            .partitionIdPageStore
+                            .put(partitionId, new 
PartitionPageStore<>(partitionId, pageStore));
+
+                    return previous == null ? null : previous.pageStore;
+                }
+        );
+    }
+
+    /**
+     * Removes the page store for the group partition.
      *
-     * @param grpId Group ID.
-     * @param pageStores Page stores.
-     * @return Previous page stores.
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     * @return Removed page store.
      */
-    public @Nullable List<T> put(Integer grpId, List<T> pageStores) {
-        return longOperationAsyncExecutor.afterAsyncCompletion(() -> 
groupPageStores.put(grpId, pageStores));
+    public @Nullable T remove(Integer groupId, Integer partitionId) {
+        AtomicReference<PartitionPageStore<T>> partitionPageStoreRef = new 
AtomicReference<>();
+
+        groupIdPageStores.compute(groupId, (id, groupPageStores) -> {

Review Comment:
   Should we use a map with primitive keys?



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/GroupPageStoresMapTest.java:
##########
@@ -52,125 +57,265 @@ void setUp() {
         
when(longOperationAsyncExecutor.afterAsyncCompletion(any(Supplier.class)))
                 .then(answer -> ((Supplier<?>) answer.getArgument(0)).get());
 
-        groupPageStores = new GroupPageStoresMap<>(longOperationAsyncExecutor);
+        groupPageStoresMap = new 
GroupPageStoresMap<>(longOperationAsyncExecutor);
     }
 
     @Test
     void testGroupCount() {
-        assertEquals(0, groupPageStores.groupCount());
+        assertEquals(0, groupPageStoresMap.groupCount());
 
-        groupPageStores.put(0, List.of());
+        groupPageStoresMap.put(0, 0, mock(FilePageStore.class));
 
-        assertEquals(1, groupPageStores.groupCount());
+        assertEquals(1, groupPageStoresMap.groupCount());
 
-        groupPageStores.put(0, List.of());
+        groupPageStoresMap.put(0, 0, mock(FilePageStore.class));
 
-        assertEquals(1, groupPageStores.groupCount());
+        assertEquals(1, groupPageStoresMap.groupCount());
 
-        groupPageStores.put(1, List.of());
+        groupPageStoresMap.put(1, 0, mock(FilePageStore.class));
 
-        assertEquals(2, groupPageStores.groupCount());
+        assertEquals(2, groupPageStoresMap.groupCount());
 
-        groupPageStores.clear();
+        groupPageStoresMap.clear();
 
-        assertEquals(0, groupPageStores.groupCount());
+        assertEquals(0, groupPageStoresMap.groupCount());
     }
 
     @Test
     void testPut() {
-        List<PageStore> holder0 = List.of();
+        FilePageStore filePageStore0 = mock(FilePageStore.class);
 
-        assertNull(groupPageStores.put(0, holder0));
+        assertNull(groupPageStoresMap.put(0, 0, filePageStore0));
         checkInvokeAfterAsyncCompletion(1);
 
-        List<PageStore> holder1 = List.of();
+        FilePageStore filePageStore1 = mock(FilePageStore.class);
 
-        assertSame(holder0, groupPageStores.put(0, holder1));
+        assertSame(filePageStore0, groupPageStoresMap.put(0, 0, 
filePageStore1));
         checkInvokeAfterAsyncCompletion(2);
 
-        assertSame(holder1, groupPageStores.get(0));
-        assertNull(groupPageStores.get(1));
+        assertNull(groupPageStoresMap.get(1));
+
+        assertThat(getAll(groupPageStoresMap.get(0)), containsInAnyOrder(new 
TestPartitionPageStore<>(0, filePageStore1)));
     }
 
     @Test
     void testGet() {
-        assertNull(groupPageStores.get(0));
-        assertNull(groupPageStores.get(1));
+        assertNull(groupPageStoresMap.get(0));
+        assertNull(groupPageStoresMap.get(1));
 
-        List<PageStore> pageStores0 = List.of();
+        FilePageStore filePageStore0 = mock(FilePageStore.class);
 
-        groupPageStores.put(0, pageStores0);
+        groupPageStoresMap.put(0, 0, filePageStore0);
 
-        assertSame(pageStores0, groupPageStores.get(0));
-        assertNull(groupPageStores.get(1));
+        GroupPageStores<PageStore> groupPageStores0 = 
groupPageStoresMap.get(0);
 
-        List<PageStore> pageStores1 = List.of();
+        assertThat(getAll(groupPageStores0), containsInAnyOrder(new 
TestPartitionPageStore<>(0, filePageStore0)));
+        assertNull(groupPageStoresMap.get(1));
 
-        groupPageStores.put(1, pageStores1);
+        FilePageStore filePageStore1 = mock(FilePageStore.class);
 
-        assertSame(pageStores0, groupPageStores.get(0));
-        assertSame(pageStores1, groupPageStores.get(1));
+        groupPageStoresMap.put(1, 1, filePageStore1);
 
-        groupPageStores.clear();
+        GroupPageStores<PageStore> groupPageStores1 = 
groupPageStoresMap.get(1);
 
-        assertNull(groupPageStores.get(0));
-        assertNull(groupPageStores.get(1));
+        assertThat(getAll(groupPageStores0), containsInAnyOrder(new 
TestPartitionPageStore<>(0, filePageStore0)));
+        assertThat(getAll(groupPageStores1), containsInAnyOrder(new 
TestPartitionPageStore<>(1, filePageStore1)));
+
+        assertEquals(TestPartitionPageStore.of(groupPageStores0.get(0)), new 
TestPartitionPageStore<>(0, filePageStore0));
+        assertEquals(TestPartitionPageStore.of(groupPageStores1.get(1)), new 
TestPartitionPageStore<>(1, filePageStore1));
+
+        assertNull(groupPageStores0.get(2));
+        assertNull(groupPageStores1.get(2));
+
+        groupPageStoresMap.clear();
+
+        assertNull(groupPageStoresMap.get(0));
+        assertNull(groupPageStoresMap.get(1));
     }
 
     @Test
     void testContainsPageStores() {
-        assertFalse(groupPageStores.containsPageStores(0));
-        assertFalse(groupPageStores.containsPageStores(1));
+        assertFalse(groupPageStoresMap.contains(0, 0));
+        assertFalse(groupPageStoresMap.contains(1, 0));
+
+        FilePageStore filePageStore0 = mock(FilePageStore.class);
 
-        groupPageStores.put(0, List.of());
+        groupPageStoresMap.put(0, 0, filePageStore0);
 
-        assertTrue(groupPageStores.containsPageStores(0));
-        assertFalse(groupPageStores.containsPageStores(1));
+        assertTrue(groupPageStoresMap.contains(0, 0));
+        assertFalse(groupPageStoresMap.contains(1, 0));
+        assertFalse(groupPageStoresMap.contains(0, 1));
 
-        List<PageStore> pageStores1 = List.of();
+        FilePageStore filePageStore1 = mock(FilePageStore.class);
 
-        groupPageStores.put(1, List.of());
+        groupPageStoresMap.put(1, 0, filePageStore1);
 
-        assertTrue(groupPageStores.containsPageStores(0));
-        assertTrue(groupPageStores.containsPageStores(1));
+        assertTrue(groupPageStoresMap.contains(0, 0));
+        assertTrue(groupPageStoresMap.contains(1, 0));
 
-        groupPageStores.clear();
+        groupPageStoresMap.clear();
 
-        assertFalse(groupPageStores.containsPageStores(0));
-        assertFalse(groupPageStores.containsPageStores(1));
+        assertFalse(groupPageStoresMap.contains(0, 0));
+        assertFalse(groupPageStoresMap.contains(1, 0));
     }
 
     @Test
-    void testAllPageStores() {
-        assertThat(groupPageStores.allPageStores(), empty());
+    void testGetAll() {
+        assertThat(groupPageStoresMap.getAll(), empty());
 
-        List<PageStore> pageStores0 = List.of();
-        List<PageStore> pageStores1 = List.of();
+        FilePageStore filePageStore0 = mock(FilePageStore.class);
+        FilePageStore filePageStore1 = mock(FilePageStore.class);
 
-        groupPageStores.put(0, pageStores0);
+        groupPageStoresMap.put(0, 0, filePageStore0);
 
-        assertThat(groupPageStores.allPageStores(), 
containsInAnyOrder(pageStores0));
+        assertThat(
+                getAll(groupPageStoresMap),
+                containsInAnyOrder(new TestGroupPartitionPageStore<>(0, 0, 
filePageStore0))
+        );
 
-        groupPageStores.put(1, pageStores1);
+        groupPageStoresMap.put(1, 1, filePageStore1);
 
-        assertThat(groupPageStores.allPageStores(), 
containsInAnyOrder(pageStores0, pageStores1));
+        assertThat(
+                getAll(groupPageStoresMap),
+                containsInAnyOrder(
+                        new TestGroupPartitionPageStore<>(0, 0, 
filePageStore0),
+                        new TestGroupPartitionPageStore<>(1, 1, filePageStore1)
+                )
+        );
 
-        groupPageStores.clear();
+        groupPageStoresMap.clear();
 
-        assertThat(groupPageStores.allPageStores(), empty());
+        assertThat(groupPageStoresMap.getAll(), empty());
     }
 
     @Test
     void testClear() {
-        assertDoesNotThrow(groupPageStores::clear);
+        assertDoesNotThrow(groupPageStoresMap::clear);
+
+        groupPageStoresMap.put(0, 0, mock(FilePageStore.class));
+
+        assertDoesNotThrow(groupPageStoresMap::clear);
+        assertEquals(0, groupPageStoresMap.groupCount());
+
+        assertNull(groupPageStoresMap.get(0));
+        assertFalse(groupPageStoresMap.contains(0, 0));
+    }
+
+    @Test
+    void testRemove() {
+        assertNull(groupPageStoresMap.remove(0, 0));
+
+        FilePageStore filePageStore = mock(FilePageStore.class);
+
+        groupPageStoresMap.put(0, 0, filePageStore);
+
+        GroupPageStores<PageStore> groupPageStores = groupPageStoresMap.get(0);
 
-        groupPageStores.put(0, List.of());
+        assertNull(groupPageStoresMap.remove(0, 1));
+        assertNull(groupPageStoresMap.remove(1, 0));
 
-        assertDoesNotThrow(groupPageStores::clear);
-        assertEquals(0, groupPageStores.groupCount());
+        assertSame(filePageStore, groupPageStoresMap.remove(0, 0));
+
+        assertNull(groupPageStoresMap.get(0));
+        assertNull(groupPageStores.get(0));
+
+        assertFalse(groupPageStoresMap.contains(0, 0));
+        assertEquals(0, groupPageStoresMap.groupCount());
     }
 
     private void checkInvokeAfterAsyncCompletion(int times) {
         verify(longOperationAsyncExecutor, 
times(times)).afterAsyncCompletion(any(Supplier.class));
     }
+
+    private static <T extends PageStore> 
Collection<TestGroupPartitionPageStore<T>> getAll(GroupPageStoresMap<T> 
groupPageStoresMap) {
+        return groupPageStoresMap.getAll().stream()
+                .flatMap(groupPageStores -> groupPageStores.getAll().stream()
+                        .map(partitionPageStore -> new 
TestGroupPartitionPageStore<>(
+                                groupPageStores.groupId(),
+                                partitionPageStore.partitionId(),
+                                partitionPageStore.pageStore()
+                        ))
+                )
+                .collect(toList());
+    }
+
+    private static <T extends PageStore> Collection<TestPartitionPageStore<T>> 
getAll(GroupPageStores<T> groupPageStores) {
+        return groupPageStores.getAll().stream()
+                .map(partitionPageStore -> new 
TestPartitionPageStore<>(partitionPageStore.partitionId(), 
partitionPageStore.pageStore()))
+                .collect(toList());
+    }
+
+    /**
+     * Inner class for tests.
+     */
+    private static class TestGroupPartitionPageStore<T extends PageStore> 
extends TestPartitionPageStore<T> {
+        @IgniteToStringInclude
+        final int groupId;
+
+        private TestGroupPartitionPageStore(int groupId, int partitionId, T 
filePageStore) {
+            super(partitionId, filePageStore);
+
+            this.groupId = groupId;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this) {
+                return true;
+            }
+
+            if (obj instanceof TestGroupPartitionPageStore) {
+                TestGroupPartitionPageStore that = 
(TestGroupPartitionPageStore) obj;
+
+                return groupId == that.groupId && partitionId == 
that.partitionId && filePageStore == that.filePageStore;
+            }
+
+            return false;
+        }
+
+        @Override
+        public String toString() {
+            return S.toString(TestGroupPartitionPageStore.class, this, 
"partitionId", partitionId, "filePageStore", filePageStore);
+        }
+    }
+
+    /**
+     * Inner class for tests.
+     */
+    private static class TestPartitionPageStore<T extends PageStore> {
+        @IgniteToStringInclude
+        final int partitionId;
+
+        @IgniteToStringInclude
+        final T filePageStore;
+
+        private TestPartitionPageStore(int partitionId, T filePageStore) {
+            this.partitionId = partitionId;
+            this.filePageStore = filePageStore;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this) {
+                return true;
+            }
+
+            if (obj instanceof TestPartitionPageStore) {
+                TestPartitionPageStore that = (TestPartitionPageStore) obj;
+
+                return partitionId == that.partitionId && filePageStore == 
that.filePageStore;

Review Comment:
   Why do you use a reference equality here?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/GroupPageStoresMap.java:
##########
@@ -47,52 +50,156 @@ public GroupPageStoresMap(LongOperationAsyncExecutor 
longOperationAsyncExecutor)
     }
 
     /**
-     * Puts the page stores for the group.
+     * Puts the page store for the group partition.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     * @param pageStore Page store.
+     * @return Previous page store.
+     */
+    public @Nullable T put(Integer groupId, Integer partitionId, T pageStore) {
+        return longOperationAsyncExecutor.afterAsyncCompletion(() -> {
+                    PartitionPageStore<T> previous = groupIdPageStores
+                            .computeIfAbsent(groupId, id -> new 
GroupPageStores<>(groupId))

Review Comment:
   I'd rather pass `id` into the constructor and not enclose the external 
variable



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/GroupPageStoresMap.java:
##########
@@ -47,52 +50,156 @@ public GroupPageStoresMap(LongOperationAsyncExecutor 
longOperationAsyncExecutor)
     }
 
     /**
-     * Puts the page stores for the group.
+     * Puts the page store for the group partition.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     * @param pageStore Page store.
+     * @return Previous page store.
+     */
+    public @Nullable T put(Integer groupId, Integer partitionId, T pageStore) {
+        return longOperationAsyncExecutor.afterAsyncCompletion(() -> {
+                    PartitionPageStore<T> previous = groupIdPageStores

Review Comment:
   Isn't this padding too big?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -164,77 +170,90 @@ void waitDeltaFiles() {
     }
 
     /**
-     * Adds the number of delta files to compact.
-     *
-     * @param count Number of delta files.
+     * Callback on adding delta files so we can start compacting them.
      */
-    public void addDeltaFiles(int count) {
-        assert count >= 0;
-
-        if (count > 0) {
-            deltaFileCount.addAndGet(count);
+    public void onAddingDeltaFiles() {
+        synchronized (mux) {
+            addedDeltaFiles = true;
 
-            synchronized (mux) {
-                mux.notifyAll();
-            }
+            mux.notifyAll();
         }
     }
 
     /**
      * Merges delta files with partition files.
      */
     void doCompaction() {
-        // Let's collect one delta file for each partition.
-        Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue = 
filePageStoreManager.allPageStores().stream()
-                .flatMap(List::stream)
-                .map(filePageStore -> {
+        while (true) {
+            // Let's collect one delta file for each partition.
+            ConcurrentLinkedQueue<DeltaFileToCompaction> queue = new 
ConcurrentLinkedQueue<>();
+
+            for (GroupPageStores<FilePageStore> groupPageStores : 
filePageStoreManager.allPageStores()) {
+                for (PartitionPageStore<FilePageStore> partitionPageStore : 
groupPageStores.getAll()) {
+                    FilePageStore filePageStore = 
partitionPageStore.pageStore();
+
                     DeltaFilePageStoreIo deltaFileToCompaction = 
filePageStore.getDeltaFileToCompaction();
 
-                    return deltaFileToCompaction == null ? null : new 
IgniteBiTuple<>(filePageStore, deltaFileToCompaction);
-                })
-                .filter(Objects::nonNull)
-                .collect(toCollection(ConcurrentLinkedQueue::new));
+                    if (!filePageStore.isMarkedToDestroy() && 
deltaFileToCompaction != null) {
+                        queue.add(new DeltaFileToCompaction(
+                                groupPageStores.groupId(),
+                                partitionPageStore.partitionId(),
+                                filePageStore,
+                                deltaFileToCompaction
+                        ));
+                    }
+                }
+            }
+
+            if (queue.isEmpty()) {

Review Comment:
   Maybe I'm worrying about it too much, we don't even know how fast or how 
frequent it'll work



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -338,6 +360,10 @@ void mergeDeltaFileToMainFile(
                 return;
             }
 
+            if (filePageStore.isMarkedToDestroy()) {

Review Comment:
   I have a feeling that this code is not safe. We had similar code with 
cache-group stopping during the snapshot (in GG). It never worked properly.



##########
modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java:
##########
@@ -419,11 +505,69 @@ private static void createTestTable(TableConfiguration 
tableConfig) {
         assertThat(createTableFuture, willCompleteSuccessfully());
     }
 
-    private static <T> List<T> getAll(Cursor<T> cursor) {
+    private static <T> List<T> getAll(Cursor<T> cursor) throws Exception {
         try (cursor) {
             return cursor.stream().collect(Collectors.toList());
-        } catch (Exception e) {
-            throw new RuntimeException(e);
         }
     }
+
+    private void checkStorageDestroyed(MvPartitionStorage storage) {
+        int partId = PARTITION_ID;
+
+        assertThrows(StorageClosedException.class, () -> 
storage.runConsistently(() -> null));
+
+        assertThrows(StorageClosedException.class, storage::flush);
+
+        assertThrows(StorageClosedException.class, storage::lastAppliedIndex);
+        assertThrows(StorageClosedException.class, storage::lastAppliedTerm);

Review Comment:
   These two don't have to throw exceptions, but ok



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java:
##########
@@ -59,6 +60,10 @@ public interface MvTableStorage {
     /**
      * Destroys a partition and all associated indices.
      *
+     * <p>This method will do nothing if there is no partition by ID, when 
trying to call methods to read or write (as well as all previous
+     * open cursors) for {@link MvPartitionStorage}, {@link HashIndexStorage} 
and {@link SortedIndexStorage}, {@link StorageClosedException}
+     * will be thrown.
+     *

Review Comment:
   This description is close to impossible to understand, please use 
re-structure your text and expand it.



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Helper class for thread-safe work with {@link PartitionProcessingCounter} 
for any partition of any group.
+ */
+public class PartitionProcessingCounterMap {
+    private final ConcurrentMap<GroupPartitionId, PartitionProcessingCounter> 
processedPartitions = new ConcurrentHashMap<>();
+
+    /**
+     * Callback at the beginning of checkpoint processing of a partition, for 
example, when writing dirty pages or executing a fsync.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     */
+    public void onStartPartitionProcessing(int groupId, int partitionId) {
+        GroupPartitionId groupPartitionId = new GroupPartitionId(groupId, 
partitionId);
+
+        processedPartitions.compute(groupPartitionId, (id, 
partitionProcessingCounter) -> {
+            if (partitionProcessingCounter == null) {
+                PartitionProcessingCounter counter = new 
PartitionProcessingCounter();
+
+                counter.onStartPartitionProcessing();
+
+                return counter;
+            }
+
+            partitionProcessingCounter.onStartPartitionProcessing();
+
+            return partitionProcessingCounter;
+        });
+    }
+
+    /**
+     * Callback on completion of partition processing, for example, when 
writing dirty pages or executing a fsync.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     */
+    public void onFinishPartitionProcessing(int groupId, int partitionId) {
+        GroupPartitionId groupPartitionId = new GroupPartitionId(groupId, 
partitionId);
+
+        processedPartitions.compute(groupPartitionId, (id, 
partitionProcessingCounter) -> {
+            assert partitionProcessingCounter != null : id;
+            assert !partitionProcessingCounter.future().isDone() : id;
+
+            partitionProcessingCounter.onFinishPartitionProcessing();
+
+            return partitionProcessingCounter.future().isDone() ? null : 
partitionProcessingCounter;
+        });
+    }
+
+    /**
+     * Returns the future if the partition according to the given parameters 
is currently being processed, for example, dirty pages are
+     * being written or fsync is being done, {@code null} if the partition is 
not currently being processed.
+     *
+     * <p>Future will be added on {@link #onStartPartitionProcessing(int, 
int)} call and completed on
+     * {@link #onFinishPartitionProcessing(int, int)} call (equal to the 
number of {@link #onFinishPartitionProcessing(int, int)} calls).
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     */
+    @Nullable
+    public CompletableFuture<Void> getProcessedPartitionFuture(int groupId, 
int partitionId) {

Review Comment:
   Is there a chance of having a race condition when this method returns null? 
Caller thinks that all partitions are processed, but then, suddenly, another 
thread calls "start" before you know it, and then it's all doomed...



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/GroupPageStoresMap.java:
##########
@@ -47,52 +50,156 @@ public GroupPageStoresMap(LongOperationAsyncExecutor 
longOperationAsyncExecutor)
     }
 
     /**
-     * Puts the page stores for the group.
+     * Puts the page store for the group partition.
+     *
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     * @param pageStore Page store.
+     * @return Previous page store.
+     */
+    public @Nullable T put(Integer groupId, Integer partitionId, T pageStore) {
+        return longOperationAsyncExecutor.afterAsyncCompletion(() -> {
+                    PartitionPageStore<T> previous = groupIdPageStores
+                            .computeIfAbsent(groupId, id -> new 
GroupPageStores<>(groupId))
+                            .partitionIdPageStore
+                            .put(partitionId, new 
PartitionPageStore<>(partitionId, pageStore));
+
+                    return previous == null ? null : previous.pageStore;
+                }
+        );
+    }
+
+    /**
+     * Removes the page store for the group partition.
      *
-     * @param grpId Group ID.
-     * @param pageStores Page stores.
-     * @return Previous page stores.
+     * @param groupId Group ID.
+     * @param partitionId Partition ID.
+     * @return Removed page store.
      */
-    public @Nullable List<T> put(Integer grpId, List<T> pageStores) {
-        return longOperationAsyncExecutor.afterAsyncCompletion(() -> 
groupPageStores.put(grpId, pageStores));
+    public @Nullable T remove(Integer groupId, Integer partitionId) {
+        AtomicReference<PartitionPageStore<T>> partitionPageStoreRef = new 
AtomicReference<>();
+
+        groupIdPageStores.compute(groupId, (id, groupPageStores) -> {

Review Comment:
   Why don't we use a composite key (groupId, partitionId)? Is this because we 
need to be able to get all group stores at the same time fast?



##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+/**
+ *  Exception that will be thrown when the storage is closed.
+ */
+public class StorageClosedException extends StorageException {
+    /**
+     * Constructor.
+     *
+     * @param message Error message.
+     */
+    public StorageClosedException(String message) {
+        super(message);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param message Error message.
+     * @param cause The cause.
+     */
+    public StorageClosedException(String message, Throwable cause) {
+        super(message, cause);

Review Comment:
   Where's the error code? :)
   I guess we should make it soon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to