sk0x50 commented on a change in pull request #5779: IGNITE-10508 Support the
new checkpoint feature not wait for the previous operation to complete
URL: https://github.com/apache/ignite/pull/5779#discussion_r253418723
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
##########
@@ -3218,139 +3363,57 @@ public IgniteInternalFuture
wakeupForSnapshotCreation(SnapshotOperation snapshot
*
*/
private void doCheckpoint() {
- Checkpoint chp = null;
+ resetLastCheckpointTimeStamp();
- try {
- CheckpointMetricsTracker tracker = new
CheckpointMetricsTracker();
+ Checkpoint chp = new Checkpoint(lastCpTs, memoryRecoveryRecordPtr);
- try {
- chp = markCheckpointBegin(tracker);
- }
- catch (IgniteCheckedException e) {
- if (curCpProgress != null)
- curCpProgress.cpFinishFut.onDone(e);
+ memoryRecoveryRecordPtr = null;
- // In case of checkpoint initialization error node should
be invalidated and stopped.
- cctx.kernalContext().failure().process(new
FailureContext(FailureType.CRITICAL_ERROR, e));
+ try {
+ markCheckpointBegin(chp);
- throw new IgniteException(e); // Re-throw as unchecked
exception to force stopping checkpoint thread.
- }
+ currCheckpointPagesCnt = chp.pages();
updateHeartbeat();
- currCheckpointPagesCnt = chp.pagesSize;
-
writtenPagesCntr = new AtomicInteger();
syncedPagesCntr = new AtomicInteger();
evictedPagesCntr = new AtomicInteger();
boolean success = false;
- int destroyedPartitionsCnt;
-
try {
- if (chp.hasDelta()) {
- // Identity stores set.
- ConcurrentLinkedHashMap<PageStore, LongAdder>
updStores = new ConcurrentLinkedHashMap<>();
-
- CountDownFuture doneWriteFut = new CountDownFuture(
- asyncRunner == null ? 1 :
chp.cpPages.collectionsSize());
-
- tracker.onPagesWriteStart();
-
- final int totalPagesToWriteCnt = chp.cpPages.size();
-
- if (asyncRunner != null) {
- for (int i = 0; i < chp.cpPages.collectionsSize();
i++) {
- Runnable write = new WriteCheckpointPages(
- tracker,
- chp.cpPages.innerCollection(i),
- updStores,
- doneWriteFut,
- totalPagesToWriteCnt,
- new Runnable() {
- @Override public void run() {
- updateHeartbeat();
- }
- },
- asyncRunner
- );
-
- try {
- asyncRunner.execute(write);
- }
- catch (RejectedExecutionException ignore) {
- // Run the task synchronously.
- updateHeartbeat();
-
- write.run();
- }
- }
- }
- else {
- // Single-threaded checkpoint.
- updateHeartbeat();
-
- Runnable write = new WriteCheckpointPages(
- tracker,
- chp.cpPages,
- updStores,
- doneWriteFut,
- totalPagesToWriteCnt,
- new Runnable() {
- @Override public void run() {
- updateHeartbeat();
- }
- },
- null);
-
- write.run();
- }
-
- updateHeartbeat();
-
- // Wait and check for errors.
- doneWriteFut.get();
-
- // Must re-check shutdown flag here because threads
may have skipped some pages.
- // If so, we should not put finish checkpoint mark.
- if (shutdownNow) {
- chp.progress.cpFinishFut.onDone(new
NodeStoppingException("Node is stopping."));
-
+ // Write dirty pages and fsync stores.
+ if (chp.hasPages()) {
+ if (checkCancel.apply(chp))
return;
- }
- tracker.onFsyncStart();
+ // Write pages to stores.
+ writePages(chp);
- if (!skipSync) {
- for (Map.Entry<PageStore, LongAdder> updStoreEntry
: updStores.entrySet()) {
- if (shutdownNow) {
- chp.progress.cpFinishFut.onDone(new
NodeStoppingException("Node is stopping."));
-
- return;
- }
+ if (checkCancel.apply(chp))
+ return;
- blockingSectionBegin();
+ // Fsync page stores.
+ fsyncStores(chp);
- try {
- updStoreEntry.getKey().sync();
- }
- finally {
- blockingSectionEnd();
- }
-
-
syncedPagesCntr.addAndGet(updStoreEntry.getValue().intValue());
- }
- }
+ if (checkCancel.apply(chp))
+ return;
}
else {
- tracker.onPagesWriteStart();
- tracker.onFsyncStart();
+ chp.metrics.onPagesWriteStart();
+
+ chp.metrics.onFsyncStart();
}
+ // Notify snapshot manager that all pages was written and
fsync completed.
Review comment:
`Notify snapshot manager that all pages were written and fsync completed.`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services