xtern commented on code in PR #10189:
URL: https://github.com/apache/ignite/pull/10189#discussion_r944281616
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotResponseRemoteFutureTask.java:
##########
@@ -72,70 +74,65 @@ public SnapshotResponseRemoteFutureTask(
return false;
try {
- List<GroupPartitionId> handled = new ArrayList<>();
+ List<SnapshotMetadata> metas =
cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
- for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet()) {
- ofNullable(e.getValue()).orElse(Collections.emptySet())
- .forEach(p -> handled.add(new GroupPartitionId(e.getKey(),
p)));
- }
+ Function<GroupPartitionId, SnapshotMetadata> findMeta = pair -> {
+ for (SnapshotMetadata meta : metas) {
+ Map<Integer, Set<Integer>> parts0 = meta.partitions();
- snpSndr.init(handled.size());
+ if (F.isEmpty(parts0))
+ continue;
- File snpDir = cctx.snapshotMgr().snapshotLocalDir(snpName,
snpPath);
+ Set<Integer> locParts = parts0.get(pair.getGroupId());
- List<CompletableFuture<Void>> futs = new ArrayList<>();
- List<SnapshotMetadata> metas =
cctx.snapshotMgr().readSnapshotMetadatas(snpName, snpPath);
+ if (locParts != null &&
locParts.contains(pair.getPartitionId()))
+ return meta;
+ }
- for (SnapshotMetadata meta : metas) {
- Map<Integer, Set<Integer>> parts0 = meta.partitions();
+ return null;
+ };
- if (F.isEmpty(parts0))
- continue;
+ Map<GroupPartitionId, SnapshotMetadata> partsToSend = new
HashMap<>();
- handled.removeIf(gp -> {
- if (ofNullable(parts0.get(gp.getGroupId()))
- .orElse(Collections.emptySet())
- .contains(gp.getPartitionId())
- ) {
- futs.add(CompletableFuture.runAsync(() -> {
- if (err.get() != null)
- return;
+ for (Map.Entry<Integer, Set<Integer>> e : parts.entrySet())
+ e.getValue().forEach(p -> partsToSend.computeIfAbsent(new
GroupPartitionId(e.getKey(), p), findMeta));
- File cacheDir = cacheDirectory(new File(snpDir,
databaseRelativePath(meta.folderName())),
- gp.getGroupId());
+ if (partsToSend.containsValue(null)) {
+ Collection<GroupPartitionId> missed =
F.viewReadOnly(partsToSend.entrySet(), Map.Entry::getKey,
+ e -> e.getValue() == null);
- if (cacheDir == null) {
- throw new IgniteException("Cache directory not
found [snpName=" + snpName + ", meta=" + meta +
- ", pair=" + gp + ']');
- }
+ err.compareAndSet(null, new IgniteException("Snapshot
partitions missed on local node " +
+ "[snpName=" + snpName + ", missed=" + missed + ']'));
Review Comment:
Additionally, we can move this check higher and remove redundant data
re-scan.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]