tkalkirill commented on code in PR #920:
URL: https://github.com/apache/ignite-3/pull/920#discussion_r921235136


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java:
##########
@@ -279,56 +288,69 @@ public List<CheckpointListener> 
collectCheckpointListeners(Collection<? extends
     private DataRegionsDirtyPages beginCheckpoint(
             Collection<? extends DataRegion<PersistentPageMemory>> dataRegions,
             CompletableFuture<?> allowToReplace
-    ) {
-        Collection<IgniteBiTuple<PersistentPageMemory, 
Collection<FullPageId>>> pages = new ArrayList<>(dataRegions.size());
+    ) throws IgniteInternalCheckedException {
+        Collection<DataRegionDirtyPages<CollectionDirtyPages>> 
dataRegionsDirtyPages = new ArrayList<>(dataRegions.size());
 
         for (DataRegion<PersistentPageMemory> dataRegion : dataRegions) {
-            Collection<FullPageId> dirtyPages = 
dataRegion.pageMemory().beginCheckpoint(allowToReplace);
+            CollectionDirtyPages dirtyPages = 
dataRegion.pageMemory().beginCheckpoint(allowToReplace);
 
-            pages.add(new IgniteBiTuple<>(dataRegion.pageMemory(), 
dirtyPages));
+            dataRegionsDirtyPages.add(new 
DataRegionDirtyPages<>(dataRegion.pageMemory(), dirtyPages));
         }
 
-        return new DataRegionsDirtyPages(pages);
+        for (DataRegionDirtyPages<CollectionDirtyPages> dataRegionDirtyPages : 
dataRegionsDirtyPages) {
+            for (GroupPartitionId dirtyPartition : 
dataRegionDirtyPages.dirtyPages.partitionIds()) {
+                
partitionFilePageStoreManager.getStore(dirtyPartition.getGroupId(), 
dirtyPartition.getPartitionId()).updateMetaPageCount();
+            }
+        }
+
+        return new DataRegionsDirtyPages(dataRegionsDirtyPages);
     }
 
     CheckpointDirtyPages createAndSortCheckpointDirtyPages(
             DataRegionsDirtyPages dataRegionsDirtyPages
     ) throws IgniteInternalCheckedException {
-        List<IgniteBiTuple<PersistentPageMemory, FullPageId[]>> 
checkpointPages = new ArrayList<>();
+        List<DataRegionDirtyPages<ArrayDirtyPages>> checkpointDirtyPages = new 
ArrayList<>();
 
         int realPagesArrSize = 0;
 
-        for (IgniteBiTuple<PersistentPageMemory, Collection<FullPageId>> 
regionDirtyPages : dataRegionsDirtyPages.dirtyPages) {
-            FullPageId[] checkpointRegionDirtyPages = new 
FullPageId[regionDirtyPages.getValue().size()];
+        for (DataRegionDirtyPages<CollectionDirtyPages> dataRegionDirtyPages : 
dataRegionsDirtyPages.dirtyPages) {
+            FullPageId[] pageIds = new 
FullPageId[dataRegionDirtyPages.dirtyPages.pageIds().size()];
 
             int pagePos = 0;
 
-            for (FullPageId dirtyPage : regionDirtyPages.getValue()) {
+            for (FullPageId dirtyPage : 
dataRegionDirtyPages.dirtyPages.pageIds()) {
                 assert realPagesArrSize++ != 
dataRegionsDirtyPages.dirtyPageCount :
                         "Incorrect estimated dirty pages number: " + 
dataRegionsDirtyPages.dirtyPageCount;
 
-                checkpointRegionDirtyPages[pagePos++] = dirtyPage;
+                pageIds[pagePos++] = dirtyPage;
             }
 
             // Some pages may have been already replaced.
             if (pagePos == 0) {
                 continue;
-            } else if (pagePos != checkpointRegionDirtyPages.length) {
-                checkpointPages.add(new 
IgniteBiTuple<>(regionDirtyPages.getKey(), 
Arrays.copyOf(checkpointRegionDirtyPages, pagePos)));
-            } else {
-                checkpointPages.add(new 
IgniteBiTuple<>(regionDirtyPages.getKey(), checkpointRegionDirtyPages));
+            } else if (pagePos != pageIds.length) {
+                pageIds = Arrays.copyOf(pageIds, pagePos);
             }
+
+            assert !dataRegionDirtyPages.dirtyPages.partitionIds().isEmpty();
+
+            GroupPartitionId[] partitionIds = 
dataRegionDirtyPages.dirtyPages.partitionIds().toArray(new GroupPartitionId[0]);
+
+            checkpointDirtyPages.add(new DataRegionDirtyPages<>(
+                    dataRegionDirtyPages.pageMemory,
+                    new ArrayDirtyPages(pageIds, partitionIds)
+            ));
         }
 
-        List<ForkJoinTask<?>> parallelSortTasks = checkpointPages.stream()
-                .map(IgniteBiTuple::getValue)
-                .filter(pages -> pages.length >= PARALLEL_SORT_THRESHOLD)
-                .map(pages -> parallelSortThreadPool.submit(() -> 
Arrays.parallelSort(pages, DIRTY_PAGE_COMPARATOR)))
+        List<ForkJoinTask<?>> parallelSortTasks = checkpointDirtyPages.stream()

Review Comment:
   I added a few comments, I hope they are enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to