ibessonov commented on code in PR #4379:
URL: https://github.com/apache/ignite-3/pull/4379#discussion_r1756602348


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPages.java:
##########
@@ -127,7 +127,7 @@ public IgniteConcurrentMultiPairQueue<PersistentPageMemory, 
FullPageId> toDirtyP
     public @Nullable CheckpointDirtyPagesView nextPartitionView(@Nullable 
CheckpointDirtyPagesView currentView) {

Review Comment:
   Is it used outside of tests? If not, I recommend removing it to avoid 
further confusion



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -159,15 +152,12 @@ public class CheckpointPagesWriter implements Runnable {
     @Override
     public void run() {
         try {
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
pageIdsToRetry = writePages(writePageIds);
+            var queueResult = new Result<PersistentPageMemory, 
GroupPartitionId>();
 
-            while (!pageIdsToRetry.isEmpty()) {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("Checkpoint pages were not written yet due to "
-                            + "unsuccessful page write lock acquisition and 
will be retried [pageCount={}]", pageIdsToRetry.size());
-                }
+            while (!shutdownNow.getAsBoolean() && 
dirtyPartitionQueue.next(queueResult)) {
+                updateHeartbeat.run();

Review Comment:
   Is it really necessary to update heartbeat here, assuming that we will 
update it for every page anyway?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -176,90 +166,85 @@ public void run() {
         }
     }
 
-    /**
-     * Writes dirty pages.
-     *
-     * @param writePageIds Queue of dirty page IDs to write.
-     * @return pagesToRetry Queue dirty page IDs which should be retried.
-     */
-    private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePages(
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds
-    ) throws IgniteInternalCheckedException {
-        Map<PersistentPageMemory, List<FullPageId>> pageIdsToRetry = new 
HashMap<>();
-
-        Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters = new 
HashMap<>();
+    private void writeDirtyPages(PersistentPageMemory pageMemory, 
GroupPartitionId partitionId) throws IgniteInternalCheckedException {
+        CheckpointDirtyPagesView checkpointDirtyPagesView = 
checkpointDirtyPagesView(pageMemory, partitionId);
 
-        // Page store writers for checkpoint buffer pages are located in a 
separate map, because they would not add elements to a
-        // "pageIdsToRetry" map. Instead, they would ignore unsuccessful write 
lock acquisitions. It's implemented this way in order to
-        // avoid duplicates in "pageIdsToRetry", there must only be a single 
source of pages to achieve that.
-        Map<PersistentPageMemory, PageStoreWriter> cpBufferPageStoreWriters = 
new HashMap<>();
+        checkpointProgress.onStartPartitionProcessing(partitionId);
 
-        ByteBuffer tmpWriteBuf = threadBuf.get();
+        try {
+            ByteBuffer tmpWriteBuf = threadBuf.get();
 
-        Result<PersistentPageMemory, FullPageId> queueResult = new Result<>();
+            if (shouldWriteMetaPage(partitionId)) {
+                writePartitionMeta(pageMemory, partitionId, 
tmpWriteBuf.rewind());
+            }
 
-        GroupPartitionId partitionId = null;
+            var pageIdsToRetry = new ArrayList<FullPageId>();
 
-        try {
-            // TODO https://issues.apache.org/jira/browse/IGNITE-23115 Try to 
write file per thread.
-            while (!shutdownNow.getAsBoolean() && 
writePageIds.next(queueResult)) {
+            for (int i = 0; i < checkpointDirtyPagesView.size() && 
!shutdownNow.getAsBoolean(); i++) {
                 updateHeartbeat.run();
 
-                FullPageId fullId = queueResult.getValue();
+                FullPageId pageId = checkpointDirtyPagesView.get(i);
 
-                PersistentPageMemory pageMemory = queueResult.getKey();
+                if (pageId.pageIdx() == 0) {
+                    // Skip meta-pages, they are written by 
"writePartitionMeta".
+                    continue;
+                }
 
-                if (hasPartitionChanged(partitionId, fullId)) {
-                    GroupPartitionId newPartitionId = toPartitionId(fullId);
+                writeDirtyPage(pageMemory, pageId, tmpWriteBuf, 
pageIdsToRetry);

Review Comment:
   In old implementation, we had a log for every retry round. Is it still in 
the code?
   If it is, there's a risk of polluting a log file.
   
   If there's no log, then please add a number of retried page writes into 
checkpoint metrics that we write into log upon completion.



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesTest.java:
##########
@@ -51,8 +51,8 @@
 public class CheckpointDirtyPagesTest extends BaseIgniteAbstractTest {
     @Test
     void testDirtyPagesCount() {
-        DataRegionDirtyPages<FullPageId[]> dirtyPages0 = 
createDirtyPages(of(0, 0, 0), of(0, 0, 1));
-        DataRegionDirtyPages<FullPageId[]> dirtyPages1 = 
createDirtyPages(of(1, 0, 0), of(1, 0, 1), of(1, 0, 2));
+        DirtyPagesAndPartitions dirtyPages1 = 
createDirtyPagesAndPartitions(of(1, 0, 0), of(1, 0, 1), of(1, 0, 2));
+        DirtyPagesAndPartitions dirtyPages0 = 
createDirtyPagesAndPartitions(of(0, 0, 0), of(0, 0, 1));

Review Comment:
   Why did you reorder it? Maybe ordering deserves its own test, right now it's 
implicit and non-intuitive



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -176,90 +166,85 @@ public void run() {
         }
     }
 
-    /**
-     * Writes dirty pages.
-     *
-     * @param writePageIds Queue of dirty page IDs to write.
-     * @return pagesToRetry Queue dirty page IDs which should be retried.
-     */
-    private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePages(
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds
-    ) throws IgniteInternalCheckedException {
-        Map<PersistentPageMemory, List<FullPageId>> pageIdsToRetry = new 
HashMap<>();
-
-        Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters = new 
HashMap<>();
+    private void writeDirtyPages(PersistentPageMemory pageMemory, 
GroupPartitionId partitionId) throws IgniteInternalCheckedException {
+        CheckpointDirtyPagesView checkpointDirtyPagesView = 
checkpointDirtyPagesView(pageMemory, partitionId);
 
-        // Page store writers for checkpoint buffer pages are located in a 
separate map, because they would not add elements to a
-        // "pageIdsToRetry" map. Instead, they would ignore unsuccessful write 
lock acquisitions. It's implemented this way in order to
-        // avoid duplicates in "pageIdsToRetry", there must only be a single 
source of pages to achieve that.
-        Map<PersistentPageMemory, PageStoreWriter> cpBufferPageStoreWriters = 
new HashMap<>();
+        checkpointProgress.onStartPartitionProcessing(partitionId);
 
-        ByteBuffer tmpWriteBuf = threadBuf.get();
+        try {
+            ByteBuffer tmpWriteBuf = threadBuf.get();
 
-        Result<PersistentPageMemory, FullPageId> queueResult = new Result<>();
+            if (shouldWriteMetaPage(partitionId)) {
+                writePartitionMeta(pageMemory, partitionId, 
tmpWriteBuf.rewind());
+            }
 
-        GroupPartitionId partitionId = null;
+            var pageIdsToRetry = new ArrayList<FullPageId>();
 
-        try {
-            // TODO https://issues.apache.org/jira/browse/IGNITE-23115 Try to 
write file per thread.
-            while (!shutdownNow.getAsBoolean() && 
writePageIds.next(queueResult)) {
+            for (int i = 0; i < checkpointDirtyPagesView.size() && 
!shutdownNow.getAsBoolean(); i++) {
                 updateHeartbeat.run();
 
-                FullPageId fullId = queueResult.getValue();
+                FullPageId pageId = checkpointDirtyPagesView.get(i);
 
-                PersistentPageMemory pageMemory = queueResult.getKey();
+                if (pageId.pageIdx() == 0) {
+                    // Skip meta-pages, they are written by 
"writePartitionMeta".
+                    continue;
+                }
 
-                if (hasPartitionChanged(partitionId, fullId)) {
-                    GroupPartitionId newPartitionId = toPartitionId(fullId);
+                writeDirtyPage(pageMemory, pageId, tmpWriteBuf, 
pageIdsToRetry);

Review Comment:
   Anyway, why did you do this in the first place? It wasn't a part of the 
task, and it's not clear whether your new solution is better.
   Can you please explain yourself here?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -176,90 +166,85 @@ public void run() {
         }
     }
 
-    /**
-     * Writes dirty pages.
-     *
-     * @param writePageIds Queue of dirty page IDs to write.
-     * @return pagesToRetry Queue dirty page IDs which should be retried.
-     */
-    private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePages(
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds
-    ) throws IgniteInternalCheckedException {
-        Map<PersistentPageMemory, List<FullPageId>> pageIdsToRetry = new 
HashMap<>();
-
-        Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters = new 
HashMap<>();
+    private void writeDirtyPages(PersistentPageMemory pageMemory, 
GroupPartitionId partitionId) throws IgniteInternalCheckedException {
+        CheckpointDirtyPagesView checkpointDirtyPagesView = 
checkpointDirtyPagesView(pageMemory, partitionId);
 
-        // Page store writers for checkpoint buffer pages are located in a 
separate map, because they would not add elements to a
-        // "pageIdsToRetry" map. Instead, they would ignore unsuccessful write 
lock acquisitions. It's implemented this way in order to
-        // avoid duplicates in "pageIdsToRetry", there must only be a single 
source of pages to achieve that.
-        Map<PersistentPageMemory, PageStoreWriter> cpBufferPageStoreWriters = 
new HashMap<>();
+        checkpointProgress.onStartPartitionProcessing(partitionId);
 
-        ByteBuffer tmpWriteBuf = threadBuf.get();
+        try {
+            ByteBuffer tmpWriteBuf = threadBuf.get();
 
-        Result<PersistentPageMemory, FullPageId> queueResult = new Result<>();
+            if (shouldWriteMetaPage(partitionId)) {
+                writePartitionMeta(pageMemory, partitionId, 
tmpWriteBuf.rewind());
+            }
 
-        GroupPartitionId partitionId = null;
+            var pageIdsToRetry = new ArrayList<FullPageId>();
 
-        try {
-            // TODO https://issues.apache.org/jira/browse/IGNITE-23115 Try to 
write file per thread.
-            while (!shutdownNow.getAsBoolean() && 
writePageIds.next(queueResult)) {
+            for (int i = 0; i < checkpointDirtyPagesView.size() && 
!shutdownNow.getAsBoolean(); i++) {
                 updateHeartbeat.run();
 
-                FullPageId fullId = queueResult.getValue();
+                FullPageId pageId = checkpointDirtyPagesView.get(i);
 
-                PersistentPageMemory pageMemory = queueResult.getKey();
+                if (pageId.pageIdx() == 0) {
+                    // Skip meta-pages, they are written by 
"writePartitionMeta".
+                    continue;
+                }
 
-                if (hasPartitionChanged(partitionId, fullId)) {
-                    GroupPartitionId newPartitionId = toPartitionId(fullId);
+                writeDirtyPage(pageMemory, pageId, tmpWriteBuf, 
pageIdsToRetry);
+            }
 
-                    // Starting for the new partition.
-                    
checkpointProgress.onStartPartitionProcessing(newPartitionId);
+            writeRetryDirtyPages(pageMemory, pageIdsToRetry, tmpWriteBuf);
+        } finally {
+            checkpointProgress.onFinishPartitionProcessing(partitionId);
+        }
+    }
 
-                    if (partitionId != null) {
-                        // Finishing for the previous partition.
-                        // TODO 
https://issues.apache.org/jira/browse/IGNITE-23105 Reimplement partition 
destruction awaiting.
-                        
checkpointProgress.onFinishPartitionProcessing(partitionId);
-                    }
+    private void writeDirtyPage(
+            PersistentPageMemory pageMemory,
+            FullPageId pageId,
+            ByteBuffer tmpWriteBuf,
+            List<FullPageId> pageIdsToRetry
+    ) throws IgniteInternalCheckedException {
+        PageStoreWriter pageStoreWriter = createPageStoreWriter(pageMemory, 
pageIdsToRetry);
 
-                    partitionId = newPartitionId;
+        // Should also be done for partitions that will be destroyed to remove 
their pages from the data region.
+        pageMemory.checkpointWritePage(pageId, tmpWriteBuf.rewind(), 
pageStoreWriter, tracker);
 
-                    if (shouldWriteMetaPage(partitionId)) {
-                        writePartitionMeta(pageMemory, partitionId, 
tmpWriteBuf.rewind());
-                    }
-                }
+        drainCheckpointBuffers(tmpWriteBuf);

Review Comment:
   `onStartPartitionProcessing` / `onFinishPartitionProcessing` still don't 
work, right? I believe you have some idea of how to fix it. Is this why you 
changed `pageIdsToRetry` usages?
   
   I'm not the biggest fan of changing a code without even explaining your idea 
to anyone, and doing it in a PR that has nothing to do with the problem. Was it 
just a convenient place to hide your intentions? :)
   
   I see how it could work in future, but at the same time I wonder what your 
proposal is, please explain it at least



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java:
##########
@@ -409,13 +411,15 @@ private DataRegionsDirtyPages beginCheckpoint(
     CheckpointDirtyPages createAndSortCheckpointDirtyPages(
             DataRegionsDirtyPages dataRegionsDirtyPages
     ) throws IgniteInternalCheckedException {
-        List<DataRegionDirtyPages<FullPageId[]>> checkpointDirtyPages = new 
ArrayList<>();
+        var checkpointDirtyPages = new ArrayList<DirtyPagesAndPartitions>();
 
         int realPagesArrSize = 0;
 
-        // Collect arrays of dirty pages for sorting.
+        // Collects dirty pages into an arrays (then we will sort them) and 
dirty partitions.

Review Comment:
   Grammar here is very weird. Do we collect pages into array and into dirty 
partitions? Probably not, please update this comment



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -288,16 +273,13 @@ private void drainCheckpointBuffers(
                         break;
                     }
 
-                    GroupPartitionId partitionId = toPartitionId(cpPageId);
+                    GroupPartitionId partitionId = 
GroupPartitionId.convert(cpPageId);
 
                     if (shouldWriteMetaPage(partitionId)) {
                         writePartitionMeta(pageMemory, partitionId, 
tmpWriteBuf.rewind());
                     }
 
-                    pageStoreWriter = pageStoreWriters.computeIfAbsent(
-                            pageMemory,
-                            pm -> createPageStoreWriter(pm, null)
-                    );
+                    pageStoreWriter = createPageStoreWriter(pageMemory, null);

Review Comment:
   Why did you remove the map? I'd rather rewrite `computeIfAbsent` to 
something with less allocations, would be better



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -176,90 +166,85 @@ public void run() {
         }
     }
 
-    /**
-     * Writes dirty pages.
-     *
-     * @param writePageIds Queue of dirty page IDs to write.
-     * @return pagesToRetry Queue dirty page IDs which should be retried.
-     */
-    private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePages(
-            IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> 
writePageIds
-    ) throws IgniteInternalCheckedException {
-        Map<PersistentPageMemory, List<FullPageId>> pageIdsToRetry = new 
HashMap<>();
-
-        Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters = new 
HashMap<>();
+    private void writeDirtyPages(PersistentPageMemory pageMemory, 
GroupPartitionId partitionId) throws IgniteInternalCheckedException {
+        CheckpointDirtyPagesView checkpointDirtyPagesView = 
checkpointDirtyPagesView(pageMemory, partitionId);
 
-        // Page store writers for checkpoint buffer pages are located in a 
separate map, because they would not add elements to a
-        // "pageIdsToRetry" map. Instead, they would ignore unsuccessful write 
lock acquisitions. It's implemented this way in order to
-        // avoid duplicates in "pageIdsToRetry", there must only be a single 
source of pages to achieve that.
-        Map<PersistentPageMemory, PageStoreWriter> cpBufferPageStoreWriters = 
new HashMap<>();
+        checkpointProgress.onStartPartitionProcessing(partitionId);
 
-        ByteBuffer tmpWriteBuf = threadBuf.get();
+        try {
+            ByteBuffer tmpWriteBuf = threadBuf.get();
 
-        Result<PersistentPageMemory, FullPageId> queueResult = new Result<>();
+            if (shouldWriteMetaPage(partitionId)) {
+                writePartitionMeta(pageMemory, partitionId, 
tmpWriteBuf.rewind());
+            }
 
-        GroupPartitionId partitionId = null;
+            var pageIdsToRetry = new ArrayList<FullPageId>();
 
-        try {
-            // TODO https://issues.apache.org/jira/browse/IGNITE-23115 Try to 
write file per thread.
-            while (!shutdownNow.getAsBoolean() && 
writePageIds.next(queueResult)) {
+            for (int i = 0; i < checkpointDirtyPagesView.size() && 
!shutdownNow.getAsBoolean(); i++) {
                 updateHeartbeat.run();
 
-                FullPageId fullId = queueResult.getValue();
+                FullPageId pageId = checkpointDirtyPagesView.get(i);
 
-                PersistentPageMemory pageMemory = queueResult.getKey();
+                if (pageId.pageIdx() == 0) {
+                    // Skip meta-pages, they are written by 
"writePartitionMeta".
+                    continue;
+                }
 
-                if (hasPartitionChanged(partitionId, fullId)) {
-                    GroupPartitionId newPartitionId = toPartitionId(fullId);
+                writeDirtyPage(pageMemory, pageId, tmpWriteBuf, 
pageIdsToRetry);
+            }
 
-                    // Starting for the new partition.
-                    
checkpointProgress.onStartPartitionProcessing(newPartitionId);
+            writeRetryDirtyPages(pageMemory, pageIdsToRetry, tmpWriteBuf);
+        } finally {
+            checkpointProgress.onFinishPartitionProcessing(partitionId);
+        }
+    }
 
-                    if (partitionId != null) {
-                        // Finishing for the previous partition.
-                        // TODO 
https://issues.apache.org/jira/browse/IGNITE-23105 Reimplement partition 
destruction awaiting.
-                        
checkpointProgress.onFinishPartitionProcessing(partitionId);
-                    }
+    private void writeDirtyPage(
+            PersistentPageMemory pageMemory,
+            FullPageId pageId,
+            ByteBuffer tmpWriteBuf,
+            List<FullPageId> pageIdsToRetry
+    ) throws IgniteInternalCheckedException {
+        PageStoreWriter pageStoreWriter = createPageStoreWriter(pageMemory, 
pageIdsToRetry);
 
-                    partitionId = newPartitionId;
+        // Should also be done for partitions that will be destroyed to remove 
their pages from the data region.
+        pageMemory.checkpointWritePage(pageId, tmpWriteBuf.rewind(), 
pageStoreWriter, tracker);
 
-                    if (shouldWriteMetaPage(partitionId)) {
-                        writePartitionMeta(pageMemory, partitionId, 
tmpWriteBuf.rewind());
-                    }
-                }
+        drainCheckpointBuffers(tmpWriteBuf);
+    }
 
-                PageStoreWriter pageStoreWriter = 
pageStoreWriters.computeIfAbsent(
-                        pageMemory,
-                        pm -> createPageStoreWriter(pm, pageIdsToRetry)
-                );
+    private void writeRetryDirtyPages(
+            PersistentPageMemory pageMemory,
+            List<FullPageId> pageIdsToRetry,
+            ByteBuffer tmpWriteBuf
+    ) throws IgniteInternalCheckedException {
+        while (!pageIdsToRetry.isEmpty()) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Checkpoint pages were not written yet due to "

Review Comment:
   Ok, we do have these log messages that will be multiplied by the total 
partitions number.
   
   I don't like it, so I'm asking once again, why did you change this part of 
implementation, when it wasn't a part of the task?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to