rpuch commented on code in PR #1325:
URL: https://github.com/apache/ignite-3/pull/1325#discussion_r1030240009
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -452,9 +456,21 @@ private void syncUpdatedPageStores(
return;
}
- fsyncDeltaFilePageStoreOnCheckpointThread(entry.getKey(),
entry.getValue());
+ GroupPartitionId partitionId = entry.getKey();
+
+ FilePageStore filePageStore =
filePageStoreManager.getStore(partitionId.getGroupId(),
partitionId.getPartitionId());
+
+ if (filePageStore == null ||
filePageStore.isMarkedToDestroy()) {
+ continue;
+ }
+
+
currentCheckpointProgress.onStartPartitionProcessing(partitionId.getGroupId(),
partitionId.getPartitionId());
Review Comment:
Looks like all actual I/O made by a `Checkpointer` has to be enclosed in
`onStartPartitionProcessing()`/`onFinishPartitionProcessing()` calls. If this
is true, I suggest to describe this requirement somewhere (in the class
javadoc, or in a comment in the beginning of this class).
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -348,6 +374,10 @@ void mergeDeltaFileToMainFile(
return;
}
+ if (filePageStore.isMarkedToDestroy()) {
Review Comment:
These checks (whether it's cancelled, whether the store is marked for
destroy) are always done together in this method. How about uniting them in a
single method like `shouldNotProceedWith(FilePageStore)` and call it in all
places in this method?
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -775,4 +799,41 @@ private void
renameDeltaFileOnCheckpointThread(GroupPartitionId partitionId) thr
void updateLastProgressAfterReleaseWriteLock() {
afterReleaseWriteLockCheckpointProgress = currentCheckpointProgress;
}
+
+ /**
+ * Callback on destruction of the partition of the corresponding group.
+ *
+ * <p>If the checkpoint is in progress, then wait until it finishes
processing the partition that we are going to destroy, in order to
+ * prevent the situation when we want to destroy the partition file along
with its delta files, and at this time the checkpoint performs
+ * I/O operations on them.
+ *
+ * @param groupId Group ID.
+ * @param partitionId Partition ID.
+ * @throws IgniteInternalCheckedException If there are errors while
processing the callback.
+ */
+ void onPartitionDestruction(int groupId, int partitionId) throws
IgniteInternalCheckedException {
+ CheckpointProgressImpl currentCheckpointProgress =
this.currentCheckpointProgress;
+
+ if (currentCheckpointProgress == null ||
!currentCheckpointProgress.inProgress()) {
+ return;
+ }
+
+ CompletableFuture<Void> processedPartitionFuture =
currentCheckpointProgress.getProcessedPartitionFuture(groupId, partitionId);
Review Comment:
For a fresh reader, it is not clear why we are not afraid of a race between
calling this method and starting a checkpoint. How about adding a comment
explaining why this is the case (as you explained me in a private conversation,
namely about generation already having been switched, so a new checkpoint would
not write any pages from the corresponding group+partition, if I understood
correctly).
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java:
##########
@@ -164,77 +170,90 @@ void waitDeltaFiles() {
}
/**
- * Adds the number of delta files to compact.
- *
- * @param count Number of delta files.
+ * Callback on adding delta files so we can start compacting them.
*/
- public void addDeltaFiles(int count) {
- assert count >= 0;
-
- if (count > 0) {
- deltaFileCount.addAndGet(count);
+ public void onAddingDeltaFiles() {
+ synchronized (mux) {
+ addedDeltaFiles = true;
- synchronized (mux) {
- mux.notifyAll();
- }
+ mux.notifyAll();
}
}
/**
* Merges delta files with partition files.
*/
void doCompaction() {
- // Let's collect one delta file for each partition.
- Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue =
filePageStoreManager.allPageStores().stream()
- .flatMap(List::stream)
- .map(filePageStore -> {
+ while (true) {
+ // Let's collect one delta file for each partition.
+ ConcurrentLinkedQueue<DeltaFileToCompaction> queue = new
ConcurrentLinkedQueue<>();
+
+ for (GroupPageStores<FilePageStore> groupPageStores :
filePageStoreManager.allPageStores()) {
+ for (PartitionPageStore<FilePageStore> partitionPageStore :
groupPageStores.getAll()) {
+ FilePageStore filePageStore =
partitionPageStore.pageStore();
+
DeltaFilePageStoreIo deltaFileToCompaction =
filePageStore.getDeltaFileToCompaction();
- return deltaFileToCompaction == null ? null : new
IgniteBiTuple<>(filePageStore, deltaFileToCompaction);
- })
- .filter(Objects::nonNull)
- .collect(toCollection(ConcurrentLinkedQueue::new));
+ if (!filePageStore.isMarkedToDestroy() &&
deltaFileToCompaction != null) {
+ queue.add(new DeltaFileToCompaction(
+ groupPageStores.groupId(),
+ partitionPageStore.partitionId(),
+ filePageStore,
+ deltaFileToCompaction
+ ));
+ }
+ }
+ }
+
+ if (queue.isEmpty()) {
+ break;
+ }
+
+ updateHeartbeat();
- assert !queue.isEmpty();
+ int threads = threadPoolExecutor == null ? 1 :
threadPoolExecutor.getMaximumPoolSize();
- updateHeartbeat();
+ CompletableFuture<?>[] futures = new CompletableFuture[threads];
- int threads = threadPoolExecutor == null ? 1 :
threadPoolExecutor.getMaximumPoolSize();
+ for (int i = 0; i < threads; i++) {
+ CompletableFuture<?> future = futures[i] = new
CompletableFuture<>();
- CompletableFuture<?>[] futures = new CompletableFuture[threads];
+ Runnable merger = () -> {
+ DeltaFileToCompaction toMerge;
- for (int i = 0; i < threads; i++) {
- CompletableFuture<?> future = futures[i] = new
CompletableFuture<>();
+ try {
+ while ((toMerge = queue.poll()) != null) {
+ GroupPartitionId partitionId = new
GroupPartitionId(toMerge.groupId, toMerge.partitionId);
+
+
processedPartitionMap.onStartPartitionProcessing(partitionId.getGroupId(),
partitionId.getPartitionId());
Review Comment:
Also, it seems that, analogously to `Checkpointer`, all the actual I/O
performed by this class must be enclosed in
`onStartPartitionProcessing()`/`onFinishPartitionProcessing()` invocations,
this is an invariant important for the whole thing to work correctly. If this
is true, it would be nice if this requirement was stated explicitly in the
class javadoc/comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]