Vladsz83 commented on code in PR #11897:
URL: https://github.com/apache/ignite/pull/11897#discussion_r2150138972
##########
modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java:
##########
@@ -750,7 +751,7 @@ public void testState() throws Exception {
final String newTag = "new_tag";
Ignite ignite = startGrids(2);
-
+
Review Comment:
might be reverted tab
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java:
##########
@@ -26,92 +26,112 @@
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
/**
* Snapshot restore operation handling task.
*/
-public class SnapshotHandlerRestoreTask extends
AbstractSnapshotVerificationTask {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override protected SnapshotHandlerRestoreJob createJob(
- String name,
- String folderName,
- String consId,
- SnapshotPartitionsVerifyTaskArg args
+public class SnapshotHandlerRestoreTask {
+ /** */
+ private final IgniteEx ignite;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final SnapshotHandlerRestoreJob job;
+
+ /** */
+ SnapshotHandlerRestoreTask(
+ IgniteEx ignite,
+ IgniteLogger log,
+ SnapshotFileTree sft,
+ Collection<String> grps,
+ boolean check
) {
- return new SnapshotHandlerRestoreJob(name, args.snapshotPath(),
folderName, consId, args.cacheGroupNames(), args.check());
+ job = new SnapshotHandlerRestoreJob(ignite, sft, grps, check);
+ this.ignite = ignite;
+ this.log = log;
}
- /** {@inheritDoc} */
- @SuppressWarnings("rawtypes")
- @Nullable @Override public SnapshotPartitionsVerifyTaskResult
reduce(List<ComputeJobResult> results) {
+ /** */
+ public Map<String, SnapshotHandlerResult<Object>> execute() {
+ return job.execute0();
+ }
+
+ /** */
+ public void reduce(
+ String snpName,
+ Map<ClusterNode, Map<Object, Map<String, SnapshotHandlerResult<?>>>>
results
+ ) {
Map<String, List<SnapshotHandlerResult<?>>> clusterResults = new
HashMap<>();
Collection<UUID> execNodes = new ArrayList<>(results.size());
- for (ComputeJobResult res : results) {
- if (res.getException() != null)
- throw res.getException();
+ // Checking node -> Map by snapshot part's consistend id.
+ for (Map.Entry<ClusterNode, Map<Object, Map<String,
SnapshotHandlerResult<?>>>> nodeRes : results.entrySet()) {
+ // Consistent id -> Map by handler name.
+ for (Map.Entry<Object, Map<String, SnapshotHandlerResult<?>>> res
: nodeRes.getValue().entrySet()) {
+ // Depending on the job mapping, we can get several different
results from one node.
+ execNodes.add(nodeRes.getKey().id());
- // Depending on the job mapping, we can get several different
results from one node.
- execNodes.add(res.getNode().id());
+ Map<String, SnapshotHandlerResult<?>> nodeDataMap =
res.getValue();
- Map<String, SnapshotHandlerResult> nodeDataMap = res.getData();
+ assert nodeDataMap != null : "At least the default snapshot
restore handler should have been executed ";
- assert nodeDataMap != null : "At least the default snapshot
restore handler should have been executed ";
+ for (Map.Entry<String, SnapshotHandlerResult<?>> entry :
nodeDataMap.entrySet()) {
+ String hndName = entry.getKey();
- for (Map.Entry<String, SnapshotHandlerResult> entry :
nodeDataMap.entrySet()) {
- String hndName = entry.getKey();
-
- clusterResults.computeIfAbsent(hndName, v -> new
ArrayList<>()).add(entry.getValue());
+ clusterResults.computeIfAbsent(hndName, v -> new
ArrayList<>()).add(entry.getValue());
+ }
}
}
- String snapshotName = F.first(F.first(metas.values())).snapshotName();
-
try {
ignite.context().cache().context().snapshotMgr().handlers().completeAll(
- SnapshotHandlerType.RESTORE, snapshotName, clusterResults,
execNodes, wrns -> {});
+ SnapshotHandlerType.RESTORE, snpName, clusterResults,
execNodes, wrns -> {});
}
catch (Exception e) {
- log.warning("The snapshot operation will be aborted due to a
handler error [snapshot=" + snapshotName + "].", e);
+ log.warning("The snapshot operation will be aborted due to a
handler error [snapshot=" + snpName + "].", e);
throw new IgniteException(e);
}
-
- return new SnapshotPartitionsVerifyTaskResult(metas, null);
}
/** Invokes all {@link SnapshotHandlerType#RESTORE} handlers locally. */
- private static class SnapshotHandlerRestoreJob extends
AbstractSnapshotVerificationJob {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
+ private static class SnapshotHandlerRestoreJob {
+ /** */
+ private final IgniteEx ignite;
+
+ /** */
+ private final SnapshotFileTree sft;
+
+ /** */
+ private final Collection<String> rqGrps;
+
+ /** */
+ private final boolean check;
/**
- * @param snpName Snapshot name.
- * @param snpPath Snapshot directory path.
- * @param folderName Folder name for snapshot.
- * @param consId Consistent id of the related node.
* @param grps Cache group names.
* @param check If {@code true} check snapshot before restore.
*/
public SnapshotHandlerRestoreJob(
- String snpName,
Review Comment:
The constructor might be private
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotHandlerRestoreTask.java:
##########
@@ -26,92 +26,112 @@
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
/**
* Snapshot restore operation handling task.
*/
-public class SnapshotHandlerRestoreTask extends
AbstractSnapshotVerificationTask {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override protected SnapshotHandlerRestoreJob createJob(
- String name,
- String folderName,
- String consId,
- SnapshotPartitionsVerifyTaskArg args
+public class SnapshotHandlerRestoreTask {
Review Comment:
Is not a task any more. Doesn't use ComputeJobResults. In not even
serializable. Looks wierd. Same for the job. Perhaps, we need to use at leas a
TODO with a future refactoring suggestion. Same for
`SnapshotMetadataVerificationTask` and `IncrementalSnapshotVerificationTask`
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyTaskResult.java:
##########
@@ -57,7 +57,7 @@ public SnapshotPartitionsVerifyTaskResult() {
* @param metas Map of snapshot metadata information found on each cluster
node.
* @param idleRes Result of cluster nodes partitions comparison.
*/
- public SnapshotPartitionsVerifyTaskResult(
+ SnapshotPartitionsVerifyTaskResult(
Review Comment:
Think, we can rename/refactor to `SnapshotPartitionsVerifyResult`
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java:
##########
@@ -34,114 +33,108 @@
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.management.cache.IdleVerifyResult;
import org.apache.ignite.internal.management.cache.PartitionKey;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.VerifyPartitionContext;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
import
org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
-import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.transactions.TransactionState;
-import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.managers.discovery.ConsistentIdMapper.ALL_NODES;
/** */
-@GridInternal
-public class IncrementalSnapshotVerificationTask extends
AbstractSnapshotVerificationTask {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public SnapshotPartitionsVerifyTaskResult
reduce(List<ComputeJobResult> results) throws IgniteException {
- IdleVerifyResult.Builder bldr = IdleVerifyResult.builder();
-
- for (ComputeJobResult nodeRes: results) {
- if (nodeRes.getException() != null) {
- bldr.addException(nodeRes.getNode(), nodeRes.getException());
+public class IncrementalSnapshotVerificationTask {
+ /** */
+ private final VerifyIncrementalSnapshotJob job;
- continue;
- }
+ /** */
+ private final IgniteLogger log;
- IncrementalSnapshotVerificationTaskResult res = nodeRes.getData();
+ /** */
+ public IncrementalSnapshotVerificationTask(IgniteEx ignite, IgniteLogger
log, SnapshotFileTree sft, int incrementalIndex) {
Review Comment:
incrementalIndex -> incrementalIdx
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotChecker.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.management.cache.IdleVerifyResult;
+import org.apache.ignite.internal.management.cache.PartitionKey;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
+import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class SnapshotChecker {
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final GridKernalContext kctx;
+
+ /** */
+ private final ExecutorService executor;
+
+ /** */
+ public SnapshotChecker(GridKernalContext kctx) {
+ this.kctx = kctx;
+
+ executor = kctx.pools().getSnapshotExecutorService();
+
+ log = kctx.log(getClass());
+ }
+
+ /** Launches local metas checking. */
+ public CompletableFuture<List<SnapshotMetadata>> checkLocalMetas(
+ SnapshotFileTree sft,
+ int incIdx,
+ @Nullable Collection<Integer> grpIds
+ ) {
+ return CompletableFuture.supplyAsync(() ->
+ new SnapshotMetadataVerificationTask(kctx.grid(), log, sft,
incIdx, grpIds).execute(), executor);
+ }
+
+ /** */
+ public CompletableFuture<IncrementalSnapshotVerificationTaskResult>
checkIncrementalSnapshot(
+ SnapshotFileTree sft,
+ int incIdx
+ ) {
+ assert incIdx > 0;
+
+ return CompletableFuture.supplyAsync(
+ () -> new IncrementalSnapshotVerificationTask(kctx.grid(), log,
sft, incIdx).execute(),
Review Comment:
Should be put on previous line or closing ')' should be put on the next line.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java:
##########
@@ -288,14 +276,15 @@ private Map<PartitionKey, PartitionHashRecord>
checkSnapshotFiles(
false,
size,
skipHash() ? F.emptyIterator()
- : snpMgr.partitionRowIterator(snpCtx, grpName,
partId, pageStore),
+ : snpMgr.partitionRowIterator(
Review Comment:
Suggestion:
```
skipHash()
? F.emptyIterator()
:
snpMgr.partitionRowIterator(cctx.kernalContext(), grpName, partId, pageStore),
```
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotVerificationTask.java:
##########
@@ -34,114 +33,108 @@
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.management.cache.IdleVerifyResult;
import org.apache.ignite.internal.management.cache.PartitionKey;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import
org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.VerifyPartitionContext;
import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecord;
import
org.apache.ignite.internal.processors.cache.verify.TransactionsHashRecord;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
-import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.transactions.TransactionState;
-import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.managers.discovery.ConsistentIdMapper.ALL_NODES;
/** */
-@GridInternal
-public class IncrementalSnapshotVerificationTask extends
AbstractSnapshotVerificationTask {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public SnapshotPartitionsVerifyTaskResult
reduce(List<ComputeJobResult> results) throws IgniteException {
- IdleVerifyResult.Builder bldr = IdleVerifyResult.builder();
-
- for (ComputeJobResult nodeRes: results) {
- if (nodeRes.getException() != null) {
- bldr.addException(nodeRes.getNode(), nodeRes.getException());
+public class IncrementalSnapshotVerificationTask {
+ /** */
+ private final VerifyIncrementalSnapshotJob job;
- continue;
- }
+ /** */
+ private final IgniteLogger log;
- IncrementalSnapshotVerificationTaskResult res = nodeRes.getData();
+ /** */
+ public IncrementalSnapshotVerificationTask(IgniteEx ignite, IgniteLogger
log, SnapshotFileTree sft, int incrementalIndex) {
+ job = new VerifyIncrementalSnapshotJob(ignite, log, sft,
incrementalIndex);
+ this.log = log;
+ }
- if (!F.isEmpty(res.exceptions())) {
- bldr.addException(nodeRes.getNode(),
F.first(res.exceptions()));
+ /** */
+ public IdleVerifyResult reduce(Map<ClusterNode,
IncrementalSnapshotVerificationTaskResult> results) throws IgniteException {
+ IdleVerifyResult.Builder bldr = IdleVerifyResult.builder();
- continue;
- }
+ for (Map.Entry<ClusterNode, IncrementalSnapshotVerificationTaskResult>
nodeRes: results.entrySet()) {
+ IncrementalSnapshotVerificationTaskResult res = nodeRes.getValue();
if (!F.isEmpty(res.partiallyCommittedTxs()))
- bldr.addPartiallyCommited(nodeRes.getNode(),
res.partiallyCommittedTxs());
+ bldr.addPartiallyCommited(nodeRes.getKey(),
res.partiallyCommittedTxs());
bldr.addPartitionHashes(res.partHashRes());
if (log.isDebugEnabled())
- log.debug("Handle VerifyIncrementalSnapshotJob result [node="
+ nodeRes.getNode() + ", taskRes=" + res + ']');
+ log.debug("Handle VerifyIncrementalSnapshotJob result [node="
+ nodeRes.getKey() + ", taskRes=" + res + ']');
- bldr.addIncrementalHashRecords(nodeRes.getNode(), res.txHashRes());
+ bldr.addIncrementalHashRecords(nodeRes.getKey(), res.txHashRes());
}
- return new SnapshotPartitionsVerifyTaskResult(metas, bldr.build());
+ return bldr.build();
}
- /** {@inheritDoc} */
- @Override protected VerifyIncrementalSnapshotJob createJob(
- String name,
- String folderName,
- String consId,
- SnapshotPartitionsVerifyTaskArg args
- ) {
- return new VerifyIncrementalSnapshotJob(name, args.snapshotPath(),
args.incrementIndex(), folderName, consId);
+ /** */
+ public IncrementalSnapshotVerificationTaskResult execute() {
+ return job.execute0();
}
/** */
- private static class VerifyIncrementalSnapshotJob extends
AbstractSnapshotVerificationJob {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
+ private static class VerifyIncrementalSnapshotJob {
+ /** */
+ private final IgniteEx ignite;
- /** Incremental snapshot index. */
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final SnapshotFileTree sft;
+
+ /** */
private final int incIdx;
/** */
private LongAdder procEntriesCnt;
/**
- * @param snpName Snapshot name.
- * @param snpPath Snapshot directory path.
- * @param folderName Folder name for snapshot.
+ * @param sft Snapshot file tree
* @param incIdx Incremental snapshot index.
- * @param folderName Folder name for snapshot.
- * @param consId Consistent id of the related node.
*/
public VerifyIncrementalSnapshotJob(
- String snpName,
Review Comment:
the constr. can be private
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java:
##########
@@ -32,100 +31,91 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.compute.ComputeJobResultPolicy;
-import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree.IncrementalSnapshotFileTree;
import
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
-import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
-import org.jetbrains.annotations.NotNull;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_FILTER;
/** Snapshot task to verify snapshot metadata on the baseline nodes for given
snapshot name. */
-@GridInternal
-public class SnapshotMetadataVerificationTask
- extends ComputeTaskAdapter<SnapshotMetadataVerificationTaskArg,
SnapshotMetadataVerificationTaskResult> {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
+public class SnapshotMetadataVerificationTask {
/** */
- private SnapshotMetadataVerificationTaskArg arg;
+ private final MetadataVerificationJob job;
/** */
- @IgniteInstanceResource
- private transient IgniteEx ignite;
-
- /** {@inheritDoc} */
- @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
- List<ClusterNode> subgrid,
- SnapshotMetadataVerificationTaskArg arg
- ) throws IgniteException {
- this.arg = arg;
-
- Map<ComputeJob, ClusterNode> map = U.newHashMap(subgrid.size());
-
- for (ClusterNode node : subgrid)
- map.put(new MetadataVerificationJob(arg), node);
+ public SnapshotMetadataVerificationTask(
+ IgniteEx ignite,
+ IgniteLogger log,
+ SnapshotFileTree sft,
+ int incrementIndex,
+ Collection<Integer> grpIds
+ ) {
+ job = new MetadataVerificationJob(ignite, log, sft, incrementIndex,
grpIds);
+ }
- return map;
+ /** */
+ public List<SnapshotMetadata> execute() {
+ return job.execute();
}
/** Job that verifies snapshot on an Ignite node. */
- private static class MetadataVerificationJob extends ComputeJobAdapter {
+ private static class MetadataVerificationJob {
+ /** */
+ private final IgniteEx ignite;
+
/** */
- private static final long serialVersionUID = 0L;
+ private final IgniteLogger log;
/** */
- @IgniteInstanceResource
- private transient IgniteEx ignite;
+ private final SnapshotFileTree sft;
/** */
- @LoggerResource
- private transient IgniteLogger log;
+ private final int incrementIndex;
/** */
- private final SnapshotMetadataVerificationTaskArg arg;
+ private final Collection<Integer> grpIds;
/** */
- public MetadataVerificationJob(SnapshotMetadataVerificationTaskArg
arg) {
- this.arg = arg;
+ public MetadataVerificationJob(
Review Comment:
private
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadataVerificationTask.java:
##########
@@ -32,100 +31,91 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobAdapter;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.compute.ComputeJobResultPolicy;
-import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree.IncrementalSnapshotFileTree;
import
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
import
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
-import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
-import org.jetbrains.annotations.NotNull;
import static
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_COMPACTED_FILTER;
/** Snapshot task to verify snapshot metadata on the baseline nodes for given
snapshot name. */
-@GridInternal
-public class SnapshotMetadataVerificationTask
- extends ComputeTaskAdapter<SnapshotMetadataVerificationTaskArg,
SnapshotMetadataVerificationTaskResult> {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
+public class SnapshotMetadataVerificationTask {
/** */
- private SnapshotMetadataVerificationTaskArg arg;
+ private final MetadataVerificationJob job;
/** */
- @IgniteInstanceResource
- private transient IgniteEx ignite;
-
- /** {@inheritDoc} */
- @Override public @NotNull Map<? extends ComputeJob, ClusterNode> map(
- List<ClusterNode> subgrid,
- SnapshotMetadataVerificationTaskArg arg
- ) throws IgniteException {
- this.arg = arg;
-
- Map<ComputeJob, ClusterNode> map = U.newHashMap(subgrid.size());
-
- for (ClusterNode node : subgrid)
- map.put(new MetadataVerificationJob(arg), node);
+ public SnapshotMetadataVerificationTask(
+ IgniteEx ignite,
+ IgniteLogger log,
+ SnapshotFileTree sft,
+ int incrementIndex,
+ Collection<Integer> grpIds
+ ) {
+ job = new MetadataVerificationJob(ignite, log, sft, incrementIndex,
grpIds);
+ }
- return map;
+ /** */
+ public List<SnapshotMetadata> execute() {
+ return job.execute();
}
/** Job that verifies snapshot on an Ignite node. */
- private static class MetadataVerificationJob extends ComputeJobAdapter {
+ private static class MetadataVerificationJob {
+ /** */
+ private final IgniteEx ignite;
+
/** */
- private static final long serialVersionUID = 0L;
+ private final IgniteLogger log;
/** */
- @IgniteInstanceResource
- private transient IgniteEx ignite;
+ private final SnapshotFileTree sft;
/** */
- @LoggerResource
- private transient IgniteLogger log;
+ private final int incrementIndex;
/** */
- private final SnapshotMetadataVerificationTaskArg arg;
+ private final Collection<Integer> grpIds;
/** */
- public MetadataVerificationJob(SnapshotMetadataVerificationTaskArg
arg) {
- this.arg = arg;
+ public MetadataVerificationJob(
+ IgniteEx ignite,
+ IgniteLogger log,
+ SnapshotFileTree sft,
+ int incrementIndex,
Review Comment:
incrementIndex -> incrementIdx
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]