tkalkirill commented on code in PR #4379:
URL: https://github.com/apache/ignite-3/pull/4379#discussion_r1756721426
##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -176,90 +166,85 @@ public void run() {
}
}
- /**
- * Writes dirty pages.
- *
- * @param writePageIds Queue of dirty page IDs to write.
- * @return pagesToRetry Queue dirty page IDs which should be retried.
- */
- private IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePages(
- IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId>
writePageIds
- ) throws IgniteInternalCheckedException {
- Map<PersistentPageMemory, List<FullPageId>> pageIdsToRetry = new
HashMap<>();
-
- Map<PersistentPageMemory, PageStoreWriter> pageStoreWriters = new
HashMap<>();
+ private void writeDirtyPages(PersistentPageMemory pageMemory,
GroupPartitionId partitionId) throws IgniteInternalCheckedException {
+ CheckpointDirtyPagesView checkpointDirtyPagesView =
checkpointDirtyPagesView(pageMemory, partitionId);
- // Page store writers for checkpoint buffer pages are located in a
separate map, because they would not add elements to a
- // "pageIdsToRetry" map. Instead, they would ignore unsuccessful write
lock acquisitions. It's implemented this way in order to
- // avoid duplicates in "pageIdsToRetry", there must only be a single
source of pages to achieve that.
- Map<PersistentPageMemory, PageStoreWriter> cpBufferPageStoreWriters =
new HashMap<>();
+ checkpointProgress.onStartPartitionProcessing(partitionId);
- ByteBuffer tmpWriteBuf = threadBuf.get();
+ try {
+ ByteBuffer tmpWriteBuf = threadBuf.get();
- Result<PersistentPageMemory, FullPageId> queueResult = new Result<>();
+ if (shouldWriteMetaPage(partitionId)) {
+ writePartitionMeta(pageMemory, partitionId,
tmpWriteBuf.rewind());
+ }
- GroupPartitionId partitionId = null;
+ var pageIdsToRetry = new ArrayList<FullPageId>();
- try {
- // TODO https://issues.apache.org/jira/browse/IGNITE-23115 Try to
write file per thread.
- while (!shutdownNow.getAsBoolean() &&
writePageIds.next(queueResult)) {
+ for (int i = 0; i < checkpointDirtyPagesView.size() &&
!shutdownNow.getAsBoolean(); i++) {
updateHeartbeat.run();
- FullPageId fullId = queueResult.getValue();
+ FullPageId pageId = checkpointDirtyPagesView.get(i);
- PersistentPageMemory pageMemory = queueResult.getKey();
+ if (pageId.pageIdx() == 0) {
+ // Skip meta-pages, they are written by
"writePartitionMeta".
+ continue;
+ }
- if (hasPartitionChanged(partitionId, fullId)) {
- GroupPartitionId newPartitionId = toPartitionId(fullId);
+ writeDirtyPage(pageMemory, pageId, tmpWriteBuf,
pageIdsToRetry);
+ }
- // Starting for the new partition.
-
checkpointProgress.onStartPartitionProcessing(newPartitionId);
+ writeRetryDirtyPages(pageMemory, pageIdsToRetry, tmpWriteBuf);
+ } finally {
+ checkpointProgress.onFinishPartitionProcessing(partitionId);
+ }
+ }
- if (partitionId != null) {
- // Finishing for the previous partition.
- // TODO
https://issues.apache.org/jira/browse/IGNITE-23105 Reimplement partition
destruction awaiting.
-
checkpointProgress.onFinishPartitionProcessing(partitionId);
- }
+ private void writeDirtyPage(
+ PersistentPageMemory pageMemory,
+ FullPageId pageId,
+ ByteBuffer tmpWriteBuf,
+ List<FullPageId> pageIdsToRetry
+ ) throws IgniteInternalCheckedException {
+ PageStoreWriter pageStoreWriter = createPageStoreWriter(pageMemory,
pageIdsToRetry);
- partitionId = newPartitionId;
+ // Should also be done for partitions that will be destroyed to remove
their pages from the data region.
+ pageMemory.checkpointWritePage(pageId, tmpWriteBuf.rewind(),
pageStoreWriter, tracker);
- if (shouldWriteMetaPage(partitionId)) {
- writePartitionMeta(pageMemory, partitionId,
tmpWriteBuf.rewind());
- }
- }
+ drainCheckpointBuffers(tmpWriteBuf);
Review Comment:
> onStartPartitionProcessing / onFinishPartitionProcessing still don't work,
right? I believe you have some idea of how to fix it. Is this why you changed
pageIdsToRetry usages?
The methods started working correctly due to the recording in one thread.
No, because of this, but from the assumption that the retrace for one
partition will also be processed quickly.
> I'm not the biggest fan of changing a code without even explaining your
idea to anyone, and doing it in a PR that has nothing to do with the problem.
Was it just a convenient place to hide your intentions? :)
I don’t solve all the problems here, I just change the idea of how it was
before.
> I see how it could work in future, but at the same time I wonder what your
proposal is, please explain it at least
I am missing your claim, please explain it in more detail.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]