ibessonov commented on code in PR #1325: URL: https://github.com/apache/ignite-3/pull/1325#discussion_r1029043178
########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounter.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.concurrent.CompletableFuture; + +/** + * Helper class for tracking the completion of partition processing. + * + * <p>At the start of partition processing, you need to call {@link #onStartPartitionProcessing()}, at the end + * {@link #onFinishPartitionProcessing()}. When all partition processing is completed, the {@link #future()} will be completed. + * + * <p>It is recommended to use external synchronization for the correct operation of the {@link #counter partition processing counter} and + * the {@link #future future}. + */ +public class PartitionProcessingCounter { + private static final VarHandle COUNTER; + + static { + try { + COUNTER = MethodHandles.lookup().findVarHandle(PartitionProcessingCounter.class, "counter", int.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + + /** Partition processing counter must be greater than or equal to zero. */ + @SuppressWarnings("unused") + private volatile int counter; + + /** Future that will be completed when the {@link #counter} is zero. */ + private final CompletableFuture<Void> future = new CompletableFuture<>(); + + /** + * Callback at the start of partition processing. Review Comment: Technically, it's not a callback. It's just a method that one should call ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java: ########## @@ -106,7 +104,7 @@ CheckpointPagesWriter build( BooleanSupplier shutdownNow ) { return new CheckpointPagesWriter( - log, + LOG, Review Comment: Should the writer also have its own logger? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java: ########## @@ -307,11 +312,23 @@ static int[] pageIndexesForDeltaFilePageStore(CheckpointDirtyPagesView partition } /** - * Adds the number of delta files to compact. + * Callback on adding delta files so we can start compacting them. + */ + public void onAddingDeltaFiles() { Review Comment: I would prefer another name, but I don't think that I have a good example. "on***" methods are good in listeners, but this call must be explicit. So I expect an explicit action in the name, like, "triggerCompaction" or something ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java: ########## @@ -452,9 +456,21 @@ private void syncUpdatedPageStores( return; } - fsyncDeltaFilePageStoreOnCheckpointThread(entry.getKey(), entry.getValue()); + GroupPartitionId partitionId = entry.getKey(); + + FilePageStore filePageStore = filePageStoreManager.getStore(partitionId.getGroupId(), partitionId.getPartitionId()); + + if (filePageStore == null || filePageStore.isMarkedToDestroy()) { + continue; + } + + currentCheckpointProgress.onStartPartitionProcessing(partitionId.getGroupId(), partitionId.getPartitionId()); + + fsyncDeltaFilePageStoreOnCheckpointThread(filePageStore, entry.getValue()); - renameDeltaFileOnCheckpointThread(entry.getKey()); + renameDeltaFileOnCheckpointThread(filePageStore, partitionId); + + currentCheckpointProgress.onFinishPartitionProcessing(partitionId.getGroupId(), partitionId.getPartitionId()); Review Comment: Is there a chance of having a future that will never be completed if something goes wrong? I don't see a try/finally section here. I don't see it in other places, that use these methods, either. There may be bugs lurking in you code because of that ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java: ########## @@ -164,77 +170,90 @@ void waitDeltaFiles() { } /** - * Adds the number of delta files to compact. - * - * @param count Number of delta files. + * Callback on adding delta files so we can start compacting them. */ - public void addDeltaFiles(int count) { - assert count >= 0; - - if (count > 0) { - deltaFileCount.addAndGet(count); + public void onAddingDeltaFiles() { + synchronized (mux) { + addedDeltaFiles = true; - synchronized (mux) { - mux.notifyAll(); - } + mux.notifyAll(); } } /** * Merges delta files with partition files. */ void doCompaction() { - // Let's collect one delta file for each partition. - Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue = filePageStoreManager.allPageStores().stream() - .flatMap(List::stream) - .map(filePageStore -> { + while (true) { Review Comment: Please explain this loop. It's not obvious ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java: ########## @@ -338,6 +360,10 @@ void mergeDeltaFileToMainFile( return; } + if (filePageStore.isMarkedToDestroy()) { Review Comment: Why do we double-check it? Is there a synchronization? What would happen if you removed this particular check and store is marked for destruction? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java: ########## @@ -139,31 +149,63 @@ public void start() throws IgniteInternalCheckedException { throw new IgniteInternalCheckedException("Could not create work directory for page stores: " + dbDir, e); } - if (log.isWarnEnabled()) { + if (LOG.isWarnEnabled()) { String tmpDir = System.getProperty("java.io.tmpdir"); if (tmpDir != null && this.dbDir.startsWith(tmpDir)) { - log.warn("Persistence store directory is in the temp directory and may be cleaned. " + LOG.warn("Persistence store directory is in the temp directory and may be cleaned. " + "To avoid this change location of persistence directories [currentDir={}]", this.dbDir); } } + List<Path> toDelete = new ArrayList<>(); + try (Stream<Path> tmpFileStream = Files.find( dbDir, Integer.MAX_VALUE, - (path, basicFileAttributes) -> path.getFileName().toString().endsWith(TMP_FILE_SUFFIX) - )) { - List<Path> tmpFiles = tmpFileStream.collect(toList()); + (path, basicFileAttributes) -> path.getFileName().toString().endsWith(TMP_FILE_SUFFIX)) + ) { + toDelete.addAll(tmpFileStream.collect(toList())); + } catch (IOException e) { + throw new IgniteInternalCheckedException("Error while searching temporary files:" + dbDir, e); + } + + Pattern delPartitionFilePatter = Pattern.compile(DEL_PART_FILE_REGEXP); + + try (Stream<Path> delFileStream = Files.find( + dbDir, + Integer.MAX_VALUE, + (path, basicFileAttributes) -> path.getFileName().toString().endsWith(DEL_FILE_SUFFIX)) + ) { + delFileStream.forEach(delFilePath -> { + Matcher matcher = delPartitionFilePatter.matcher(delFilePath.getFileName().toString()); - if (!tmpFiles.isEmpty()) { - if (log.isInfoEnabled()) { - log.info("Temporary files to be deleted: {}", tmpFiles.size()); + if (!matcher.matches()) { + throw new IgniteInternalException("Unknown file: " + delFilePath); } - tmpFiles.forEach(IgniteUtils::deleteIfExists); - } + Path tableWorkDir = delFilePath.getParent(); + + int partitionId = Integer.parseInt(matcher.group(1)); + + toDelete.add(tableWorkDir.resolve(String.format(PART_FILE_TEMPLATE, partitionId))); + + try { + toDelete.addAll(List.of(findPartitionDeltaFiles(tableWorkDir, partitionId))); + } catch (IgniteInternalCheckedException e) { + throw new IgniteInternalException("Error when searching delta files for partition:" + delFilePath, e); + } + + toDelete.add(delFilePath); + }); } catch (IOException e) { - throw new IgniteInternalCheckedException("Could not create work directory for page stores: " + dbDir, e); + throw new IgniteInternalCheckedException("Error while searching temporary files:" + dbDir, e); + } + + if (!toDelete.isEmpty()) { + LOG.info("Files to be deleted: {}", toDelete); Review Comment: Please add a line with reasons to delete these files. Something generic, like "Cleaning obsolete files for table x" ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java: ########## @@ -443,4 +461,50 @@ public Path tmpDeltaFilePageStorePath(int groupId, int partitionId, int index) { public Path deltaFilePageStorePath(int groupId, int partitionId, int index) { return dbDir.resolve(GROUP_DIR_PREFIX + groupId).resolve(String.format(PART_DELTA_FILE_TEMPLATE, partitionId, index)); } + + /** + * Callback on destruction of the partition of the corresponding group. + * + * <p>Deletes the partition pages tore and all its delta files. Before that, it creates a marker file (for example, Review Comment: ```suggestion * <p>Deletes the partition pages store and all its delta files. Before that, it creates a marker file (for example, ``` ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/GroupPageStoresMap.java: ########## @@ -47,52 +50,156 @@ public GroupPageStoresMap(LongOperationAsyncExecutor longOperationAsyncExecutor) } /** - * Puts the page stores for the group. + * Puts the page store for the group partition. + * + * @param groupId Group ID. + * @param partitionId Partition ID. + * @param pageStore Page store. + * @return Previous page store. + */ + public @Nullable T put(Integer groupId, Integer partitionId, T pageStore) { + return longOperationAsyncExecutor.afterAsyncCompletion(() -> { + PartitionPageStore<T> previous = groupIdPageStores + .computeIfAbsent(groupId, id -> new GroupPageStores<>(groupId)) + .partitionIdPageStore + .put(partitionId, new PartitionPageStore<>(partitionId, pageStore)); + + return previous == null ? null : previous.pageStore; + } + ); + } + + /** + * Removes the page store for the group partition. * - * @param grpId Group ID. - * @param pageStores Page stores. - * @return Previous page stores. + * @param groupId Group ID. + * @param partitionId Partition ID. + * @return Removed page store. */ - public @Nullable List<T> put(Integer grpId, List<T> pageStores) { - return longOperationAsyncExecutor.afterAsyncCompletion(() -> groupPageStores.put(grpId, pageStores)); + public @Nullable T remove(Integer groupId, Integer partitionId) { + AtomicReference<PartitionPageStore<T>> partitionPageStoreRef = new AtomicReference<>(); + + groupIdPageStores.compute(groupId, (id, groupPageStores) -> { + if (groupPageStores == null) { + return null; + } + + partitionPageStoreRef.set(groupPageStores.partitionIdPageStore.remove(partitionId)); + + if (groupPageStores.partitionIdPageStore.isEmpty()) { + return null; + } + + return groupPageStores; + }); + + PartitionPageStore<T> partitionPageStore = partitionPageStoreRef.get(); + + return partitionPageStore == null ? null : partitionPageStore.pageStore; } /** * Returns the page stores for the group. * - * @param grpId Group ID. + * @param groupId Group ID. */ - public @Nullable List<T> get(Integer grpId) { - return groupPageStores.get(grpId); + public @Nullable GroupPageStores<T> get(Integer groupId) { + return groupIdPageStores.get(groupId); } /** - * Returns {@code true} if a page stores exists for the group. + * Returns {@code true} if a page store exists for the group partition. * - * @param grpId Group ID. + * @param groupId Group ID. + * @param partitionId Partition ID. */ - public boolean containsPageStores(Integer grpId) { - return groupPageStores.containsKey(grpId); + public boolean contains(Integer groupId, Integer partitionId) { + GroupPageStores<T> groupPageStores = groupIdPageStores.get(groupId); + + return groupPageStores != null && groupPageStores.partitionIdPageStore.containsKey(partitionId); } /** - * Returns all page stores of all groups. + * Returns a view of all page stores of all groups. */ - public Collection<List<T>> allPageStores() { - return groupPageStores.values(); + public Collection<GroupPageStores<T>> getAll() { + return unmodifiableCollection(groupIdPageStores.values()); } /** * Clears all page stores of all groups. */ public void clear() { - groupPageStores.clear(); + groupIdPageStores.clear(); } /** * Returns the number of groups for which there are page stores. */ public int groupCount() { - return groupPageStores.size(); + return groupIdPageStores.size(); + } + + /** + * Group partition page stores. + */ + public static class GroupPageStores<T extends PageStore> { + private final int groupId; + + private final ConcurrentMap<Integer, PartitionPageStore<T>> partitionIdPageStore = new ConcurrentHashMap<>(); + + private GroupPageStores(int groupId) { + this.groupId = groupId; + } + + /** + * Returns the group ID. + */ + public int groupId() { + return groupId; + } + + /** + * Returns the partition's page store. + */ + @Nullable + public PartitionPageStore<T> get(Integer partitionId) { + return partitionIdPageStore.get(partitionId); + } + + /** + * Returns a view of the group's partition page stores. + */ + public Collection<PartitionPageStore<T>> getAll() { + return unmodifiableCollection(partitionIdPageStore.values()); + } + } + + /** + * Partition page store. + */ + public static class PartitionPageStore<T extends PageStore> { + private final int partitionId; + + private final T pageStore; Review Comment: Does PagesStore have a getter for partitionId? Maybe it should, please validate this idea ########## modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java: ########## @@ -86,41 +97,29 @@ public PersistentPageMemoryDataRegion dataRegion() { return dataRegion; } - /** {@inheritDoc} */ @Override public boolean isVolatile() { return false; } - /** {@inheritDoc} */ - @Override - public void start() throws StorageException { - super.start(); - - TableView tableView = tableCfg.value(); - - try { - dataRegion.filePageStoreManager().initialize(tableView.name(), tableView.tableId(), tableView.partitions()); - - int deltaFileCount = dataRegion.filePageStoreManager().getStores(tableView.tableId()).stream() - .mapToInt(FilePageStore::deltaFileCount) - .sum(); - - dataRegion.checkpointManager().addDeltaFileCountForCompaction(deltaFileCount); - } catch (IgniteInternalCheckedException e) { - throw new StorageException("Error initializing file page stores for table: " + tableView.name(), e); - } - } - - /** {@inheritDoc} */ @Override public void destroy() throws StorageException { close(true); } - /** {@inheritDoc} */ @Override public PersistentPageMemoryMvPartitionStorage createMvPartitionStorage(int partitionId) { + CompletableFuture<Void> partitionDestroyFuture = destroyFutureByPartitionId.get(partitionId); + + if (partitionDestroyFuture != null) { + try { + // Time is chosen randomly (long enough) so as not to call #join(). + partitionDestroyFuture.get(10, TimeUnit.SECONDS); Review Comment: Again, "randomly" and "long enough" - I don't like it. This particular method is trickier, we should think about what to do here and how to design things. ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java: ########## @@ -164,77 +170,90 @@ void waitDeltaFiles() { } /** - * Adds the number of delta files to compact. - * - * @param count Number of delta files. + * Callback on adding delta files so we can start compacting them. */ - public void addDeltaFiles(int count) { - assert count >= 0; - - if (count > 0) { - deltaFileCount.addAndGet(count); + public void onAddingDeltaFiles() { + synchronized (mux) { + addedDeltaFiles = true; - synchronized (mux) { - mux.notifyAll(); - } + mux.notifyAll(); } } /** * Merges delta files with partition files. */ void doCompaction() { Review Comment: Who calls this method? Pretty small description for a method this big. I see that it waits for some futures at the end. This means that it blocks a caller thread. Is this necessary? Please describe what's going on, code itself is hard to understand. ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java: ########## @@ -775,4 +799,41 @@ private void renameDeltaFileOnCheckpointThread(GroupPartitionId partitionId) thr void updateLastProgressAfterReleaseWriteLock() { afterReleaseWriteLockCheckpointProgress = currentCheckpointProgress; } + + /** + * Callback on destruction of the partition of the corresponding group. + * + * <p>If the checkpoint is in progress, then wait until it finishes processing the partition that we are going to destroy, in order to + * prevent the situation when we want to destroy the partition file along with its delta files, and at this time the checkpoint performs + * I/O operations on them. + * + * @param groupId Group ID. + * @param partitionId Partition ID. + * @throws IgniteInternalCheckedException If there are errors while processing the callback. + */ + void onPartitionDestruction(int groupId, int partitionId) throws IgniteInternalCheckedException { + CheckpointProgressImpl currentCheckpointProgress = this.currentCheckpointProgress; + + if (currentCheckpointProgress == null || !currentCheckpointProgress.inProgress()) { + return; + } + + CompletableFuture<Void> processedPartitionFuture = currentCheckpointProgress.getProcessedPartitionFuture(groupId, partitionId); + + if (processedPartitionFuture != null) { + try { + // Time is taken arbitrarily, but long enough to allow time for the future to complete. + processedPartitionFuture.get(10, SECONDS); Review Comment: Wait, what? First of all, please don't hard-code such constants. Second, we're constantly suffer from other people using "get" or "join" on futures, saying that it's a horrible practice. And now I see this, without proper explanation of why it needs to be here. So, either explain thoroughly, or even better, re-write this code to avoid explicit await. ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounter.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.concurrent.CompletableFuture; + +/** + * Helper class for tracking the completion of partition processing. + * + * <p>At the start of partition processing, you need to call {@link #onStartPartitionProcessing()}, at the end + * {@link #onFinishPartitionProcessing()}. When all partition processing is completed, the {@link #future()} will be completed. + * + * <p>It is recommended to use external synchronization for the correct operation of the {@link #counter partition processing counter} and + * the {@link #future future}. + */ +public class PartitionProcessingCounter { Review Comment: Should this class be package-private? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java: ########## @@ -65,13 +66,17 @@ public class Compactor extends IgniteWorker { private final @Nullable ThreadPoolExecutor threadPoolExecutor; - private final AtomicInteger deltaFileCount = new AtomicInteger(); + /** Guarded by {@link #mux}. */ + private boolean addedDeltaFiles; private final FilePageStoreManager filePageStoreManager; /** Thread local with buffers for the compaction threads. */ private final ThreadLocal<ByteBuffer> threadBuf; Review Comment: By the way, why not "static". Is it possible that several compactor workers operate in a single thread? I don't think so ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java: ########## @@ -307,11 +312,23 @@ static int[] pageIndexesForDeltaFilePageStore(CheckpointDirtyPagesView partition } /** - * Adds the number of delta files to compact. + * Callback on adding delta files so we can start compacting them. + */ + public void onAddingDeltaFiles() { + compactor.onAddingDeltaFiles(); + } + + /** + * Callback on destruction of the partition of the corresponding group. + * + * <p>Prepares the checkpointer and compactor for partition destruction. * - * @param count Number of delta files. + * @param groupId Group ID. + * @param partitionId Partition ID. + * @throws IgniteInternalCheckedException If there are errors while processing the callback. */ - public void addDeltaFileCountForCompaction(int count) { - compactor.addDeltaFiles(count); + public void onPartitionDestruction(int groupId, int partitionId) throws IgniteInternalCheckedException { Review Comment: This one doesn't return any future. I guess we get them differently? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java: ########## @@ -164,77 +170,90 @@ void waitDeltaFiles() { } /** - * Adds the number of delta files to compact. - * - * @param count Number of delta files. + * Callback on adding delta files so we can start compacting them. */ - public void addDeltaFiles(int count) { - assert count >= 0; - - if (count > 0) { - deltaFileCount.addAndGet(count); + public void onAddingDeltaFiles() { + synchronized (mux) { + addedDeltaFiles = true; - synchronized (mux) { - mux.notifyAll(); - } + mux.notifyAll(); } } /** * Merges delta files with partition files. */ void doCompaction() { - // Let's collect one delta file for each partition. - Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue = filePageStoreManager.allPageStores().stream() - .flatMap(List::stream) - .map(filePageStore -> { + while (true) { + // Let's collect one delta file for each partition. + ConcurrentLinkedQueue<DeltaFileToCompaction> queue = new ConcurrentLinkedQueue<>(); + + for (GroupPageStores<FilePageStore> groupPageStores : filePageStoreManager.allPageStores()) { + for (PartitionPageStore<FilePageStore> partitionPageStore : groupPageStores.getAll()) { + FilePageStore filePageStore = partitionPageStore.pageStore(); + DeltaFilePageStoreIo deltaFileToCompaction = filePageStore.getDeltaFileToCompaction(); - return deltaFileToCompaction == null ? null : new IgniteBiTuple<>(filePageStore, deltaFileToCompaction); - }) - .filter(Objects::nonNull) - .collect(toCollection(ConcurrentLinkedQueue::new)); + if (!filePageStore.isMarkedToDestroy() && deltaFileToCompaction != null) { + queue.add(new DeltaFileToCompaction( + groupPageStores.groupId(), + partitionPageStore.partitionId(), + filePageStore, + deltaFileToCompaction + )); + } + } + } + + if (queue.isEmpty()) { Review Comment: So, with this queue you're supervising the thread-pool, right? You read a batch of delta files, distribute them across the pool, and then read the next batch and so on. If what I described is correct, you're under-utilizing resources. All threads must wait until the last thread finishes its delta file, and only then the next batch is created. Why have you decided to use this approach? Was this all intentional? Did you realize that threads are under-utilized in this implementation? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java: ########## @@ -146,7 +150,7 @@ protected void body() throws InterruptedException { void waitDeltaFiles() { Review Comment: How many threads execute this method? If more than one, then I see a huge problem. First thread to wake up will set "addedDeltaFiles" to false. Other threads will see that and continue the loop indefinitely (or until cancellation, to be precise). Basically, with this change you ruined multi-threading in compactor, please be careful. ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java: ########## @@ -164,77 +170,90 @@ void waitDeltaFiles() { } /** - * Adds the number of delta files to compact. - * - * @param count Number of delta files. + * Callback on adding delta files so we can start compacting them. */ - public void addDeltaFiles(int count) { - assert count >= 0; - - if (count > 0) { - deltaFileCount.addAndGet(count); + public void onAddingDeltaFiles() { + synchronized (mux) { + addedDeltaFiles = true; - synchronized (mux) { - mux.notifyAll(); - } + mux.notifyAll(); } } /** * Merges delta files with partition files. */ void doCompaction() { - // Let's collect one delta file for each partition. - Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue = filePageStoreManager.allPageStores().stream() - .flatMap(List::stream) - .map(filePageStore -> { + while (true) { + // Let's collect one delta file for each partition. + ConcurrentLinkedQueue<DeltaFileToCompaction> queue = new ConcurrentLinkedQueue<>(); + + for (GroupPageStores<FilePageStore> groupPageStores : filePageStoreManager.allPageStores()) { + for (PartitionPageStore<FilePageStore> partitionPageStore : groupPageStores.getAll()) { + FilePageStore filePageStore = partitionPageStore.pageStore(); + DeltaFilePageStoreIo deltaFileToCompaction = filePageStore.getDeltaFileToCompaction(); - return deltaFileToCompaction == null ? null : new IgniteBiTuple<>(filePageStore, deltaFileToCompaction); - }) - .filter(Objects::nonNull) - .collect(toCollection(ConcurrentLinkedQueue::new)); + if (!filePageStore.isMarkedToDestroy() && deltaFileToCompaction != null) { + queue.add(new DeltaFileToCompaction( + groupPageStores.groupId(), + partitionPageStore.partitionId(), + filePageStore, + deltaFileToCompaction + )); + } + } + } + + if (queue.isEmpty()) { + break; + } + + updateHeartbeat(); - assert !queue.isEmpty(); + int threads = threadPoolExecutor == null ? 1 : threadPoolExecutor.getMaximumPoolSize(); Review Comment: How can it be null? Is it possible to set it up in this way? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java: ########## @@ -357,14 +387,70 @@ void mergeDeltaFileToMainFile( return; } + if (filePageStore.isMarkedToDestroy()) { + return; + } + boolean removed = filePageStore.removeDeltaFile(deltaFilePageStore); assert removed : filePageStore.filePath(); deltaFilePageStore.markMergedToFilePageStore(); deltaFilePageStore.stop(true); + } + + /** + * Callback on destruction of the partition of the corresponding group. + * + * <p>If the partition compaction is in progress, then we will wait until it is completed so that there are no errors when we want to + * destroy the partition file and its delta file, and at this time its compaction occurs. + * + * @param groupId Group ID. + * @param partitionId Partition ID. + */ + public void onPartitionDestruction(int groupId, int partitionId) throws IgniteInternalCheckedException { + CompletableFuture<Void> partitionProcessingFuture = processedPartitionMap.getProcessedPartitionFuture(groupId, partitionId); + + if (partitionProcessingFuture != null) { + try { + // Time is taken arbitrarily, but long enough to allow time for the future to complete. + partitionProcessingFuture.get(10, SECONDS); Review Comment: Again, I don't like this code. I very-very don't like it. For some clients, checkpoints may take, like, a minute. So 10 seconds isn't enough. And, even if it is enough, having all these "get" calls is a bad practice. ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java: ########## @@ -357,14 +387,70 @@ void mergeDeltaFileToMainFile( return; } + if (filePageStore.isMarkedToDestroy()) { + return; + } + boolean removed = filePageStore.removeDeltaFile(deltaFilePageStore); assert removed : filePageStore.filePath(); deltaFilePageStore.markMergedToFilePageStore(); deltaFilePageStore.stop(true); + } + + /** + * Callback on destruction of the partition of the corresponding group. + * + * <p>If the partition compaction is in progress, then we will wait until it is completed so that there are no errors when we want to + * destroy the partition file and its delta file, and at this time its compaction occurs. + * + * @param groupId Group ID. + * @param partitionId Partition ID. + */ + public void onPartitionDestruction(int groupId, int partitionId) throws IgniteInternalCheckedException { + CompletableFuture<Void> partitionProcessingFuture = processedPartitionMap.getProcessedPartitionFuture(groupId, partitionId); + + if (partitionProcessingFuture != null) { + try { + // Time is taken arbitrarily, but long enough to allow time for the future to complete. + partitionProcessingFuture.get(10, SECONDS); + } catch (Exception e) { + throw new IgniteInternalCheckedException( + IgniteStringFormatter.format( + "Error waiting for partition processing to complete on compaction: [groupId={}, partitionId={}]", + groupId, + partitionId + ), + e + ); + } + } + } + + /** + * Delta file for compaction. + */ + private static class DeltaFileToCompaction { Review Comment: It's "for compaction", or "to compact", there's no other option ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java: ########## @@ -320,9 +369,9 @@ void stopAllGroupFilePageStores(boolean cleanFiles) { try { stopGroupFilePageStores(partitionPageStores, cleanFiles); - log.info("Cleanup cache stores [total={}, cleanFiles={}]", partitionPageStores.size(), cleanFiles); + LOG.info("Cleanup cache stores [total={}, cleanFiles={}]", partitionPageStores.size(), cleanFiles); } catch (Exception e) { - log.info("Failed to gracefully stop page store managers", e); + LOG.info("Failed to gracefully stop page store managers", e); Review Comment: Are you sure this is "info" level event? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/GroupPageStoresMap.java: ########## @@ -47,52 +50,156 @@ public GroupPageStoresMap(LongOperationAsyncExecutor longOperationAsyncExecutor) } /** - * Puts the page stores for the group. + * Puts the page store for the group partition. + * + * @param groupId Group ID. + * @param partitionId Partition ID. + * @param pageStore Page store. + * @return Previous page store. + */ + public @Nullable T put(Integer groupId, Integer partitionId, T pageStore) { + return longOperationAsyncExecutor.afterAsyncCompletion(() -> { + PartitionPageStore<T> previous = groupIdPageStores + .computeIfAbsent(groupId, id -> new GroupPageStores<>(groupId)) + .partitionIdPageStore + .put(partitionId, new PartitionPageStore<>(partitionId, pageStore)); + + return previous == null ? null : previous.pageStore; + } + ); + } + + /** + * Removes the page store for the group partition. * - * @param grpId Group ID. - * @param pageStores Page stores. - * @return Previous page stores. + * @param groupId Group ID. + * @param partitionId Partition ID. + * @return Removed page store. */ - public @Nullable List<T> put(Integer grpId, List<T> pageStores) { - return longOperationAsyncExecutor.afterAsyncCompletion(() -> groupPageStores.put(grpId, pageStores)); + public @Nullable T remove(Integer groupId, Integer partitionId) { + AtomicReference<PartitionPageStore<T>> partitionPageStoreRef = new AtomicReference<>(); + + groupIdPageStores.compute(groupId, (id, groupPageStores) -> { Review Comment: Should we use a map with primitive keys? ########## modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/GroupPageStoresMapTest.java: ########## @@ -52,125 +57,265 @@ void setUp() { when(longOperationAsyncExecutor.afterAsyncCompletion(any(Supplier.class))) .then(answer -> ((Supplier<?>) answer.getArgument(0)).get()); - groupPageStores = new GroupPageStoresMap<>(longOperationAsyncExecutor); + groupPageStoresMap = new GroupPageStoresMap<>(longOperationAsyncExecutor); } @Test void testGroupCount() { - assertEquals(0, groupPageStores.groupCount()); + assertEquals(0, groupPageStoresMap.groupCount()); - groupPageStores.put(0, List.of()); + groupPageStoresMap.put(0, 0, mock(FilePageStore.class)); - assertEquals(1, groupPageStores.groupCount()); + assertEquals(1, groupPageStoresMap.groupCount()); - groupPageStores.put(0, List.of()); + groupPageStoresMap.put(0, 0, mock(FilePageStore.class)); - assertEquals(1, groupPageStores.groupCount()); + assertEquals(1, groupPageStoresMap.groupCount()); - groupPageStores.put(1, List.of()); + groupPageStoresMap.put(1, 0, mock(FilePageStore.class)); - assertEquals(2, groupPageStores.groupCount()); + assertEquals(2, groupPageStoresMap.groupCount()); - groupPageStores.clear(); + groupPageStoresMap.clear(); - assertEquals(0, groupPageStores.groupCount()); + assertEquals(0, groupPageStoresMap.groupCount()); } @Test void testPut() { - List<PageStore> holder0 = List.of(); + FilePageStore filePageStore0 = mock(FilePageStore.class); - assertNull(groupPageStores.put(0, holder0)); + assertNull(groupPageStoresMap.put(0, 0, filePageStore0)); checkInvokeAfterAsyncCompletion(1); - List<PageStore> holder1 = List.of(); + FilePageStore filePageStore1 = mock(FilePageStore.class); - assertSame(holder0, groupPageStores.put(0, holder1)); + assertSame(filePageStore0, groupPageStoresMap.put(0, 0, filePageStore1)); checkInvokeAfterAsyncCompletion(2); - assertSame(holder1, groupPageStores.get(0)); - assertNull(groupPageStores.get(1)); + assertNull(groupPageStoresMap.get(1)); + + assertThat(getAll(groupPageStoresMap.get(0)), containsInAnyOrder(new TestPartitionPageStore<>(0, filePageStore1))); } @Test void testGet() { - assertNull(groupPageStores.get(0)); - assertNull(groupPageStores.get(1)); + assertNull(groupPageStoresMap.get(0)); + assertNull(groupPageStoresMap.get(1)); - List<PageStore> pageStores0 = List.of(); + FilePageStore filePageStore0 = mock(FilePageStore.class); - groupPageStores.put(0, pageStores0); + groupPageStoresMap.put(0, 0, filePageStore0); - assertSame(pageStores0, groupPageStores.get(0)); - assertNull(groupPageStores.get(1)); + GroupPageStores<PageStore> groupPageStores0 = groupPageStoresMap.get(0); - List<PageStore> pageStores1 = List.of(); + assertThat(getAll(groupPageStores0), containsInAnyOrder(new TestPartitionPageStore<>(0, filePageStore0))); + assertNull(groupPageStoresMap.get(1)); - groupPageStores.put(1, pageStores1); + FilePageStore filePageStore1 = mock(FilePageStore.class); - assertSame(pageStores0, groupPageStores.get(0)); - assertSame(pageStores1, groupPageStores.get(1)); + groupPageStoresMap.put(1, 1, filePageStore1); - groupPageStores.clear(); + GroupPageStores<PageStore> groupPageStores1 = groupPageStoresMap.get(1); - assertNull(groupPageStores.get(0)); - assertNull(groupPageStores.get(1)); + assertThat(getAll(groupPageStores0), containsInAnyOrder(new TestPartitionPageStore<>(0, filePageStore0))); + assertThat(getAll(groupPageStores1), containsInAnyOrder(new TestPartitionPageStore<>(1, filePageStore1))); + + assertEquals(TestPartitionPageStore.of(groupPageStores0.get(0)), new TestPartitionPageStore<>(0, filePageStore0)); + assertEquals(TestPartitionPageStore.of(groupPageStores1.get(1)), new TestPartitionPageStore<>(1, filePageStore1)); + + assertNull(groupPageStores0.get(2)); + assertNull(groupPageStores1.get(2)); + + groupPageStoresMap.clear(); + + assertNull(groupPageStoresMap.get(0)); + assertNull(groupPageStoresMap.get(1)); } @Test void testContainsPageStores() { - assertFalse(groupPageStores.containsPageStores(0)); - assertFalse(groupPageStores.containsPageStores(1)); + assertFalse(groupPageStoresMap.contains(0, 0)); + assertFalse(groupPageStoresMap.contains(1, 0)); + + FilePageStore filePageStore0 = mock(FilePageStore.class); - groupPageStores.put(0, List.of()); + groupPageStoresMap.put(0, 0, filePageStore0); - assertTrue(groupPageStores.containsPageStores(0)); - assertFalse(groupPageStores.containsPageStores(1)); + assertTrue(groupPageStoresMap.contains(0, 0)); + assertFalse(groupPageStoresMap.contains(1, 0)); + assertFalse(groupPageStoresMap.contains(0, 1)); - List<PageStore> pageStores1 = List.of(); + FilePageStore filePageStore1 = mock(FilePageStore.class); - groupPageStores.put(1, List.of()); + groupPageStoresMap.put(1, 0, filePageStore1); - assertTrue(groupPageStores.containsPageStores(0)); - assertTrue(groupPageStores.containsPageStores(1)); + assertTrue(groupPageStoresMap.contains(0, 0)); + assertTrue(groupPageStoresMap.contains(1, 0)); - groupPageStores.clear(); + groupPageStoresMap.clear(); - assertFalse(groupPageStores.containsPageStores(0)); - assertFalse(groupPageStores.containsPageStores(1)); + assertFalse(groupPageStoresMap.contains(0, 0)); + assertFalse(groupPageStoresMap.contains(1, 0)); } @Test - void testAllPageStores() { - assertThat(groupPageStores.allPageStores(), empty()); + void testGetAll() { + assertThat(groupPageStoresMap.getAll(), empty()); - List<PageStore> pageStores0 = List.of(); - List<PageStore> pageStores1 = List.of(); + FilePageStore filePageStore0 = mock(FilePageStore.class); + FilePageStore filePageStore1 = mock(FilePageStore.class); - groupPageStores.put(0, pageStores0); + groupPageStoresMap.put(0, 0, filePageStore0); - assertThat(groupPageStores.allPageStores(), containsInAnyOrder(pageStores0)); + assertThat( + getAll(groupPageStoresMap), + containsInAnyOrder(new TestGroupPartitionPageStore<>(0, 0, filePageStore0)) + ); - groupPageStores.put(1, pageStores1); + groupPageStoresMap.put(1, 1, filePageStore1); - assertThat(groupPageStores.allPageStores(), containsInAnyOrder(pageStores0, pageStores1)); + assertThat( + getAll(groupPageStoresMap), + containsInAnyOrder( + new TestGroupPartitionPageStore<>(0, 0, filePageStore0), + new TestGroupPartitionPageStore<>(1, 1, filePageStore1) + ) + ); - groupPageStores.clear(); + groupPageStoresMap.clear(); - assertThat(groupPageStores.allPageStores(), empty()); + assertThat(groupPageStoresMap.getAll(), empty()); } @Test void testClear() { - assertDoesNotThrow(groupPageStores::clear); + assertDoesNotThrow(groupPageStoresMap::clear); + + groupPageStoresMap.put(0, 0, mock(FilePageStore.class)); + + assertDoesNotThrow(groupPageStoresMap::clear); + assertEquals(0, groupPageStoresMap.groupCount()); + + assertNull(groupPageStoresMap.get(0)); + assertFalse(groupPageStoresMap.contains(0, 0)); + } + + @Test + void testRemove() { + assertNull(groupPageStoresMap.remove(0, 0)); + + FilePageStore filePageStore = mock(FilePageStore.class); + + groupPageStoresMap.put(0, 0, filePageStore); + + GroupPageStores<PageStore> groupPageStores = groupPageStoresMap.get(0); - groupPageStores.put(0, List.of()); + assertNull(groupPageStoresMap.remove(0, 1)); + assertNull(groupPageStoresMap.remove(1, 0)); - assertDoesNotThrow(groupPageStores::clear); - assertEquals(0, groupPageStores.groupCount()); + assertSame(filePageStore, groupPageStoresMap.remove(0, 0)); + + assertNull(groupPageStoresMap.get(0)); + assertNull(groupPageStores.get(0)); + + assertFalse(groupPageStoresMap.contains(0, 0)); + assertEquals(0, groupPageStoresMap.groupCount()); } private void checkInvokeAfterAsyncCompletion(int times) { verify(longOperationAsyncExecutor, times(times)).afterAsyncCompletion(any(Supplier.class)); } + + private static <T extends PageStore> Collection<TestGroupPartitionPageStore<T>> getAll(GroupPageStoresMap<T> groupPageStoresMap) { + return groupPageStoresMap.getAll().stream() + .flatMap(groupPageStores -> groupPageStores.getAll().stream() + .map(partitionPageStore -> new TestGroupPartitionPageStore<>( + groupPageStores.groupId(), + partitionPageStore.partitionId(), + partitionPageStore.pageStore() + )) + ) + .collect(toList()); + } + + private static <T extends PageStore> Collection<TestPartitionPageStore<T>> getAll(GroupPageStores<T> groupPageStores) { + return groupPageStores.getAll().stream() + .map(partitionPageStore -> new TestPartitionPageStore<>(partitionPageStore.partitionId(), partitionPageStore.pageStore())) + .collect(toList()); + } + + /** + * Inner class for tests. + */ + private static class TestGroupPartitionPageStore<T extends PageStore> extends TestPartitionPageStore<T> { + @IgniteToStringInclude + final int groupId; + + private TestGroupPartitionPageStore(int groupId, int partitionId, T filePageStore) { + super(partitionId, filePageStore); + + this.groupId = groupId; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj instanceof TestGroupPartitionPageStore) { + TestGroupPartitionPageStore that = (TestGroupPartitionPageStore) obj; + + return groupId == that.groupId && partitionId == that.partitionId && filePageStore == that.filePageStore; + } + + return false; + } + + @Override + public String toString() { + return S.toString(TestGroupPartitionPageStore.class, this, "partitionId", partitionId, "filePageStore", filePageStore); + } + } + + /** + * Inner class for tests. + */ + private static class TestPartitionPageStore<T extends PageStore> { + @IgniteToStringInclude + final int partitionId; + + @IgniteToStringInclude + final T filePageStore; + + private TestPartitionPageStore(int partitionId, T filePageStore) { + this.partitionId = partitionId; + this.filePageStore = filePageStore; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj instanceof TestPartitionPageStore) { + TestPartitionPageStore that = (TestPartitionPageStore) obj; + + return partitionId == that.partitionId && filePageStore == that.filePageStore; Review Comment: Why do you use a reference equality here? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/GroupPageStoresMap.java: ########## @@ -47,52 +50,156 @@ public GroupPageStoresMap(LongOperationAsyncExecutor longOperationAsyncExecutor) } /** - * Puts the page stores for the group. + * Puts the page store for the group partition. + * + * @param groupId Group ID. + * @param partitionId Partition ID. + * @param pageStore Page store. + * @return Previous page store. + */ + public @Nullable T put(Integer groupId, Integer partitionId, T pageStore) { + return longOperationAsyncExecutor.afterAsyncCompletion(() -> { + PartitionPageStore<T> previous = groupIdPageStores + .computeIfAbsent(groupId, id -> new GroupPageStores<>(groupId)) Review Comment: I'd rather pass `id` into the constructor and not enclose the external variable ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/GroupPageStoresMap.java: ########## @@ -47,52 +50,156 @@ public GroupPageStoresMap(LongOperationAsyncExecutor longOperationAsyncExecutor) } /** - * Puts the page stores for the group. + * Puts the page store for the group partition. + * + * @param groupId Group ID. + * @param partitionId Partition ID. + * @param pageStore Page store. + * @return Previous page store. + */ + public @Nullable T put(Integer groupId, Integer partitionId, T pageStore) { + return longOperationAsyncExecutor.afterAsyncCompletion(() -> { + PartitionPageStore<T> previous = groupIdPageStores Review Comment: Isn't this padding too big? ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java: ########## @@ -164,77 +170,90 @@ void waitDeltaFiles() { } /** - * Adds the number of delta files to compact. - * - * @param count Number of delta files. + * Callback on adding delta files so we can start compacting them. */ - public void addDeltaFiles(int count) { - assert count >= 0; - - if (count > 0) { - deltaFileCount.addAndGet(count); + public void onAddingDeltaFiles() { + synchronized (mux) { + addedDeltaFiles = true; - synchronized (mux) { - mux.notifyAll(); - } + mux.notifyAll(); } } /** * Merges delta files with partition files. */ void doCompaction() { - // Let's collect one delta file for each partition. - Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue = filePageStoreManager.allPageStores().stream() - .flatMap(List::stream) - .map(filePageStore -> { + while (true) { + // Let's collect one delta file for each partition. + ConcurrentLinkedQueue<DeltaFileToCompaction> queue = new ConcurrentLinkedQueue<>(); + + for (GroupPageStores<FilePageStore> groupPageStores : filePageStoreManager.allPageStores()) { + for (PartitionPageStore<FilePageStore> partitionPageStore : groupPageStores.getAll()) { + FilePageStore filePageStore = partitionPageStore.pageStore(); + DeltaFilePageStoreIo deltaFileToCompaction = filePageStore.getDeltaFileToCompaction(); - return deltaFileToCompaction == null ? null : new IgniteBiTuple<>(filePageStore, deltaFileToCompaction); - }) - .filter(Objects::nonNull) - .collect(toCollection(ConcurrentLinkedQueue::new)); + if (!filePageStore.isMarkedToDestroy() && deltaFileToCompaction != null) { + queue.add(new DeltaFileToCompaction( + groupPageStores.groupId(), + partitionPageStore.partitionId(), + filePageStore, + deltaFileToCompaction + )); + } + } + } + + if (queue.isEmpty()) { Review Comment: Maybe I'm worrying about it too much, we don't even know how fast or how frequent it'll work ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java: ########## @@ -338,6 +360,10 @@ void mergeDeltaFileToMainFile( return; } + if (filePageStore.isMarkedToDestroy()) { Review Comment: I have a feeling that this code is not safe. We had similar code with cache-group stopping during the snapshot (in GG). It never worked properly. ########## modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java: ########## @@ -419,11 +505,69 @@ private static void createTestTable(TableConfiguration tableConfig) { assertThat(createTableFuture, willCompleteSuccessfully()); } - private static <T> List<T> getAll(Cursor<T> cursor) { + private static <T> List<T> getAll(Cursor<T> cursor) throws Exception { try (cursor) { return cursor.stream().collect(Collectors.toList()); - } catch (Exception e) { - throw new RuntimeException(e); } } + + private void checkStorageDestroyed(MvPartitionStorage storage) { + int partId = PARTITION_ID; + + assertThrows(StorageClosedException.class, () -> storage.runConsistently(() -> null)); + + assertThrows(StorageClosedException.class, storage::flush); + + assertThrows(StorageClosedException.class, storage::lastAppliedIndex); + assertThrows(StorageClosedException.class, storage::lastAppliedTerm); Review Comment: These two don't have to throw exceptions, but ok ########## modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/MvTableStorage.java: ########## @@ -59,6 +60,10 @@ public interface MvTableStorage { /** * Destroys a partition and all associated indices. * + * <p>This method will do nothing if there is no partition by ID, when trying to call methods to read or write (as well as all previous + * open cursors) for {@link MvPartitionStorage}, {@link HashIndexStorage} and {@link SortedIndexStorage}, {@link StorageClosedException} + * will be thrown. + * Review Comment: This description is close to impossible to understand, please use re-structure your text and expand it. ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionProcessingCounterMap.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.jetbrains.annotations.Nullable; + +/** + * Helper class for thread-safe work with {@link PartitionProcessingCounter} for any partition of any group. + */ +public class PartitionProcessingCounterMap { + private final ConcurrentMap<GroupPartitionId, PartitionProcessingCounter> processedPartitions = new ConcurrentHashMap<>(); + + /** + * Callback at the beginning of checkpoint processing of a partition, for example, when writing dirty pages or executing a fsync. + * + * @param groupId Group ID. + * @param partitionId Partition ID. + */ + public void onStartPartitionProcessing(int groupId, int partitionId) { + GroupPartitionId groupPartitionId = new GroupPartitionId(groupId, partitionId); + + processedPartitions.compute(groupPartitionId, (id, partitionProcessingCounter) -> { + if (partitionProcessingCounter == null) { + PartitionProcessingCounter counter = new PartitionProcessingCounter(); + + counter.onStartPartitionProcessing(); + + return counter; + } + + partitionProcessingCounter.onStartPartitionProcessing(); + + return partitionProcessingCounter; + }); + } + + /** + * Callback on completion of partition processing, for example, when writing dirty pages or executing a fsync. + * + * @param groupId Group ID. + * @param partitionId Partition ID. + */ + public void onFinishPartitionProcessing(int groupId, int partitionId) { + GroupPartitionId groupPartitionId = new GroupPartitionId(groupId, partitionId); + + processedPartitions.compute(groupPartitionId, (id, partitionProcessingCounter) -> { + assert partitionProcessingCounter != null : id; + assert !partitionProcessingCounter.future().isDone() : id; + + partitionProcessingCounter.onFinishPartitionProcessing(); + + return partitionProcessingCounter.future().isDone() ? null : partitionProcessingCounter; + }); + } + + /** + * Returns the future if the partition according to the given parameters is currently being processed, for example, dirty pages are + * being written or fsync is being done, {@code null} if the partition is not currently being processed. + * + * <p>Future will be added on {@link #onStartPartitionProcessing(int, int)} call and completed on + * {@link #onFinishPartitionProcessing(int, int)} call (equal to the number of {@link #onFinishPartitionProcessing(int, int)} calls). + * + * @param groupId Group ID. + * @param partitionId Partition ID. + */ + @Nullable + public CompletableFuture<Void> getProcessedPartitionFuture(int groupId, int partitionId) { Review Comment: Is there a chance of having a race condition when this method returns null? Caller thinks that all partitions are processed, but then, suddenly, another thread calls "start" before you know it, and then it's all doomed... ########## modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/GroupPageStoresMap.java: ########## @@ -47,52 +50,156 @@ public GroupPageStoresMap(LongOperationAsyncExecutor longOperationAsyncExecutor) } /** - * Puts the page stores for the group. + * Puts the page store for the group partition. + * + * @param groupId Group ID. + * @param partitionId Partition ID. + * @param pageStore Page store. + * @return Previous page store. + */ + public @Nullable T put(Integer groupId, Integer partitionId, T pageStore) { + return longOperationAsyncExecutor.afterAsyncCompletion(() -> { + PartitionPageStore<T> previous = groupIdPageStores + .computeIfAbsent(groupId, id -> new GroupPageStores<>(groupId)) + .partitionIdPageStore + .put(partitionId, new PartitionPageStore<>(partitionId, pageStore)); + + return previous == null ? null : previous.pageStore; + } + ); + } + + /** + * Removes the page store for the group partition. * - * @param grpId Group ID. - * @param pageStores Page stores. - * @return Previous page stores. + * @param groupId Group ID. + * @param partitionId Partition ID. + * @return Removed page store. */ - public @Nullable List<T> put(Integer grpId, List<T> pageStores) { - return longOperationAsyncExecutor.afterAsyncCompletion(() -> groupPageStores.put(grpId, pageStores)); + public @Nullable T remove(Integer groupId, Integer partitionId) { + AtomicReference<PartitionPageStore<T>> partitionPageStoreRef = new AtomicReference<>(); + + groupIdPageStores.compute(groupId, (id, groupPageStores) -> { Review Comment: Why don't we use a composite key (groupId, partitionId)? Is this because we need to be able to get all group stores at the same time fast? ########## modules/storage-api/src/main/java/org/apache/ignite/internal/storage/StorageClosedException.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage; + +/** + * Exception that will be thrown when the storage is closed. + */ +public class StorageClosedException extends StorageException { + /** + * Constructor. + * + * @param message Error message. + */ + public StorageClosedException(String message) { + super(message); + } + + /** + * Constructor. + * + * @param message Error message. + * @param cause The cause. + */ + public StorageClosedException(String message, Throwable cause) { + super(message, cause); Review Comment: Where's the error code? :) I guess we should make it soon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
