sk0x50 commented on a change in pull request #5779: IGNITE-10508 Support the
new checkpoint feature not wait for the previous operation to complete
URL: https://github.com/apache/ignite/pull/5779#discussion_r253438207
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
##########
@@ -3992,58 +4187,174 @@ public void shutdownNow() {
}
}
- /**
- * Reorders list of checkpoint pages and splits them into needed number of
sublists according to
- * {@link DataStorageConfiguration#getCheckpointThreads()} and
- * {@link DataStorageConfiguration#getCheckpointWriteOrder()}.
- *
- * @param cpPagesTuple Checkpoint pages tuple.
- */
- private GridMultiCollectionWrapper<FullPageId> splitAndSortCpPagesIfNeeded(
- IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>,
Integer> cpPagesTuple
- ) {
- List<FullPageId> cpPagesList = new ArrayList<>(cpPagesTuple.get2());
+ private static class CheckpointPages {
+ protected int lowBound;
+
+ protected int hightBound;
+
+ protected FullPageId[] pageIds;
+
+ protected Set<FullPageId> retryPageIds = new GridConcurrentHashSet<>();
+
+ protected int totalCpPages;
+
+ protected int written;
- for (GridMultiCollectionWrapper<FullPageId> col : cpPagesTuple.get1())
{
- for (int i = 0; i < col.collectionsSize(); i++)
- cpPagesList.addAll(col.innerCollection(i));
+ protected int lastAddIdx;
+
+ public int checkpointPages() {
+ return totalCpPages;
+ }
+
+ public boolean isEmpty() {
+ return retryPageIds.isEmpty();
}
- if (persistenceCfg.getCheckpointWriteOrder() ==
CheckpointWriteOrder.SEQUENTIAL) {
- FullPageId[] objects = cpPagesList.toArray(new
FullPageId[cpPagesList.size()]);
+ public Iterator<FullPageId> iterator() {
+ if (retryPageIds.isEmpty())
+ return iteratorPages(pageIds);
+
+ return retryPageIds.iterator();
+ }
+
+ private Iterator<FullPageId> iteratorPages(FullPageId[] pageIds) {
+ int size = hightBound - lowBound;
+
+ return new Iterator<FullPageId>() {
+ private int idx;
- Arrays.parallelSort(objects, new Comparator<FullPageId>() {
- @Override public int compare(FullPageId o1, FullPageId o2) {
- int cmp = Long.compare(o1.groupId(), o2.groupId());
- if (cmp != 0)
- return cmp;
+ @Override public boolean hasNext() {
+ return idx < size;
+ }
+
+ @Override public FullPageId next() {
+ return pageIds[lowBound + (idx++)];
+ }
+
+ @Override public void remove() {
+ written++;
- return
Long.compare(PageIdUtils.effectivePageId(o1.pageId()),
- PageIdUtils.effectivePageId(o2.pageId()));
+ // Set null as marker pages written or outdated, if next
checkpoint merge will required.
+ pageIds[lowBound + (idx - 1)] = null;
}
- });
+ };
+ }
+
+ public void addForRetry(FullPageId fullPageId) {
+ assert fullPageId != null;
+
+ retryPageIds.add(fullPageId);
+ }
+
+ protected void addPage(FullPageId pageId) {
+ assert lastAddIdx < totalCpPages;
+
+ pageIds[lastAddIdx++] = pageId;
+ }
- cpPagesList = Arrays.asList(objects);
+ protected void addPages(Collection<FullPageId> pageIds) {
+ for (FullPageId fullPageId : pageIds)
+ addPage(fullPageId);
}
- int cpThreads = persistenceCfg.getCheckpointThreads();
+ }
+
+ private static class CheckpointBeginPages extends CheckpointPages {
- int pagesSubLists = cpThreads == 1 ? 1 : cpThreads * 4;
- // Splitting pages to (threads * 4) subtasks. If any thread will be
faster, it will help slower threads.
+ private final CheckpointWriteOrder writeOrder;
- Collection[] pagesSubListArr = new Collection[pagesSubLists];
+ private CheckpointPages[] splittedCpPages;
- for (int i = 0; i < pagesSubLists; i++) {
- int totalSize = cpPagesList.size();
+ private Collection<GridMultiCollectionWrapper<FullPageId>>
markChpBeginPages;
- int from = totalSize * i / (pagesSubLists);
+ private int markChpBeginPagesCnt;
- int to = totalSize * (i + 1) / (pagesSubLists);
+ private CheckpointBeginPages(CheckpointWriteOrder order) {
+ writeOrder = order;
+ }
- pagesSubListArr[i] = cpPagesList.subList(from, to);
+ private void addCheckpointPages(
+ IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>,
Integer> markChpBeginTup
+ ) {
+ markChpBeginPages = markChpBeginTup.get1();
+ markChpBeginPagesCnt = markChpBeginTup.get2();
}
- return new GridMultiCollectionWrapper<FullPageId>(pagesSubListArr);
+ private void onCheckpointBegin() {
+ int cpPages = checkpointPages();
+
+ // Init array for all checkpoint page Ids.
+ pageIds = new FullPageId[cpPages];
+ totalCpPages = cpPages;
+ lowBound = 0;
+ hightBound = cpPages;
Review comment:
`highBound`. Personally, I would prefer `lowerBound` and `upperBound`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services