xtern commented on a change in pull request #9047: URL: https://github.com/apache/ignite/pull/9047#discussion_r621296629
########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java ########## @@ -525,76 +537,105 @@ else if (affNode && missed.isEmpty() && cctx.kernalContext().query().moduleEnabl for (CacheConfigurationSender ccfgSndr : ccfgSndrs) futs.add(CompletableFuture.runAsync(wrapExceptionIfStarted(ccfgSndr::sendCacheConfig), snpSndr.executor())); - for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) { - int grpId = e.getKey(); + try { + for (Map.Entry<Integer, Set<Integer>> e : processed.entrySet()) { + int grpId = e.getKey(); + String cacheDirName = cacheDirName(grpId); - CacheGroupContext gctx = cctx.cache().cacheGroup(grpId); + // Process partitions for a particular cache group. + for (int partId : e.getValue()) { + GroupPartitionId pair = new GroupPartitionId(grpId, partId); - if (gctx == null) { - acceptException(new IgniteCheckedException("Cache group context has not found " + - "due to the cache group is stopped: " + grpId)); + Long partLen = partFileLengths.get(pair); - break; + CompletableFuture<Void> fut0 = CompletableFuture.runAsync( + wrapExceptionIfStarted(() -> { + snpSndr.sendPart( + getPartitionFile(pageStore.workDir(), cacheDirName, partId), + cacheDirName, + pair, + partLen); + + // Stop partition writer. + partDeltaWriters.get(pair).markPartitionProcessed(); + }), + snpSndr.executor()) + // Wait for the completion of both futures - checkpoint end, copy partition. + .runAfterBothAsync(cpEndFut, + wrapExceptionIfStarted(() -> { + File delta = partDeltaWriters.get(pair).deltaFile; + + try { + // Atomically creates a new, empty delta file if and only if + // a file with this name does not yet exist. + delta.createNewFile(); + } + catch (IOException ex) { + throw new IgniteCheckedException(ex); + } + + snpSndr.sendDelta(delta, cacheDirName, pair); + + boolean deleted = delta.delete(); + + assert deleted; + }), + snpSndr.executor()); + + futs.add(fut0); + } } - // Process partitions for a particular cache group. - for (int partId : e.getValue()) { - GroupPartitionId pair = new GroupPartitionId(grpId, partId); - - CacheConfiguration<?, ?> ccfg = gctx.config(); + int futsSize = futs.size(); - assert ccfg != null : "Cache configuration cannot be empty on snapshot creation: " + pair; + CompletableFuture.allOf(futs.toArray(new CompletableFuture[futsSize])) + .whenComplete((res, t) -> { + assert t == null : "Exception must never be thrown since a wrapper is used " + + "for each snapshot task: " + t; - String cacheDirName = cacheDirName(ccfg); - Long partLen = partFileLengths.get(pair); - - CompletableFuture<Void> fut0 = CompletableFuture.runAsync( - wrapExceptionIfStarted(() -> { - snpSndr.sendPart( - getPartitionFile(pageStore.workDir(), cacheDirName, partId), - cacheDirName, - pair, - partLen); - - // Stop partition writer. - partDeltaWriters.get(pair).markPartitionProcessed(); - }), - snpSndr.executor()) - // Wait for the completion of both futures - checkpoint end, copy partition. - .runAfterBothAsync(cpEndFut, - wrapExceptionIfStarted(() -> { - File delta = partDeltaWriters.get(pair).deltaFile; - - try { - // Atomically creates a new, empty delta file if and only if - // a file with this name does not yet exist. - delta.createNewFile(); - } - catch (IOException ex) { - throw new IgniteCheckedException(ex); - } + closeAsync(); + }); + } + catch (IgniteCheckedException e) { + acceptException(e); + } + } - snpSndr.sendDelta(delta, cacheDirName, pair); + /** + * @param grpId Cache group id. + * @param parts Set of partitions to be processed. + * @param dirSupp Directory to init. + * @throws IgniteCheckedException If fails. + */ + private void addPartitionWriters(int grpId, Set<Integer> parts, IgniteThrowableSupplier<String> dirSupp) throws IgniteCheckedException { Review comment: why you need dirSupp supplier? Why don't pass dirName directly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org