This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new baf2c91  IGNITE-15328 Consistency recovery command (Read Repair via 
control.ch) should support cancellation (#9409)
baf2c91 is described below

commit baf2c9143d3b0e40e2440baf6b4f99f4bd3041d0
Author: Anton Vinogradov <a...@apache.org>
AuthorDate: Mon Sep 20 12:09:36 2021 +0300

    IGNITE-15328 Consistency recovery command (Read Repair via control.ch) 
should support cancellation (#9409)
---
 docs/_docs/sql-reference/operational-commands.adoc |  24 ++++-
 .../consistency/ConsistencyCommand.java            |  34 ++++++-
 .../internal/commandline/query/KillCommand.java    |  30 +++++-
 .../internal/commandline/query/KillSubcommand.java |   3 +
 .../util/GridCommandHandlerAbstractTest.java       |   3 +
 .../util/GridCommandHandlerConsistencyTest.java    |  20 +++-
 .../ignite/util/KillCommandsCommandShTest.java     | 107 +++++++++++++++++++++
 .../internal/processors/job/GridJobWorker.java     |   6 +-
 .../consistency/VisorConsistencyCancelTask.java    |  81 ++++++++++++++++
 .../consistency/VisorConsistencyRepairTask.java    |  57 ++++++++---
 .../VisorConsistencyRepairTaskResult.java          | 102 ++++++++++++++++++++
 .../main/resources/META-INF/classnames.properties  |   2 +
 12 files changed, 447 insertions(+), 22 deletions(-)

diff --git a/docs/_docs/sql-reference/operational-commands.adoc 
b/docs/_docs/sql-reference/operational-commands.adoc
index be7223f..d1f4641 100644
--- a/docs/_docs/sql-reference/operational-commands.adoc
+++ b/docs/_docs/sql-reference/operational-commands.adoc
@@ -334,7 +334,7 @@ KILL CONTINUOUS 
'6fa749ee-7cf8-4635-be10-36a1c75267a7_54321' '6fa749ee-7cf8-4635
 
 == KILL SERVICE
 
-The `KILL SERVICE` command allows you to cance a running service.
+The `KILL SERVICE` command allows you to cancel a running service.
 
 [tabs]
 --
@@ -370,3 +370,25 @@ control.bat --kill SERVICE name
 
 * `name` - corresponds to the name you selected for the service upon the 
deployment time.
 You can always find it with the 
link:monitoring-metrics/system-views#services[SERVICES] view.
+
+
+== KILL CONSISTENCY repair/check operations
+
+The `KILL CONSISTENCY` command allows you to cancel all running consistency 
repair/check operations.
+
+[tabs]
+--
+
+tab:Unix[]
+[source,bash]
+----
+./control.sh --kill CONSISTENCY
+----
+
+tab:Windows[]
+[source,bash]
+----
+control.bat --kill CONSISTENCY
+----
+
+--
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/consistency/ConsistencyCommand.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/consistency/ConsistencyCommand.java
index a1f56c4..10ef23a 100644
--- 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/consistency/ConsistencyCommand.java
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/consistency/ConsistencyCommand.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.commandline.consistency;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.logging.Logger;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.client.GridClient;
 import org.apache.ignite.internal.client.GridClientConfiguration;
 import org.apache.ignite.internal.commandline.AbstractCommand;
@@ -27,6 +28,7 @@ import org.apache.ignite.internal.commandline.Command;
 import org.apache.ignite.internal.commandline.CommandArgIterator;
 import org.apache.ignite.internal.commandline.CommandLogger;
 import 
org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTaskArg;
+import 
org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTaskResult;
 
 import static org.apache.ignite.internal.commandline.CommandList.CONSISTENCY;
 import static 
org.apache.ignite.internal.commandline.TaskExecutor.BROADCAST_UUID;
@@ -46,8 +48,12 @@ public class ConsistencyCommand extends 
AbstractCommand<VisorConsistencyRepairTa
 
     /** {@inheritDoc} */
     @Override public Object execute(GridClientConfiguration clientCfg, Logger 
log) throws Exception {
+        boolean failed = false;
+
+        StringBuilder sb = new StringBuilder();
+
         try (GridClient client = Command.startClient(clientCfg)) {
-            Object res = executeTaskByNameOnNode(
+            VisorConsistencyRepairTaskResult res = executeTaskByNameOnNode(
                 client,
                 cmd.taskName(),
                 arg(),
@@ -55,9 +61,22 @@ public class ConsistencyCommand extends 
AbstractCommand<VisorConsistencyRepairTa
                 clientCfg
             );
 
-            log.info(String.valueOf(res));
+            if (res.cancelled()) {
+                sb.append("Operation execution cancelled.\n\n");
+
+                failed = true;
+            }
+
+            if (res.failed()) {
+                sb.append("Operation execution failed.\n\n");
+
+                failed = true;
+            }
 
-            return res;
+            if (failed)
+                sb.append("[EXECUTION FAILED OR CANCELLED, RESULTS MAY BE 
INCOMPLETE OR INCONSISTENT]\n\n");
+
+            sb.append(res.message());
         }
         catch (Throwable e) {
             log.severe("Failed to perform operation.");
@@ -65,6 +84,15 @@ public class ConsistencyCommand extends 
AbstractCommand<VisorConsistencyRepairTa
 
             throw e;
         }
+
+        String output = sb.toString();
+
+        if (failed)
+            throw new IgniteCheckedException(output);
+        else
+            log.info(output);
+
+        return output;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillCommand.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillCommand.java
index 7682113..12926aa 100644
--- 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillCommand.java
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillCommand.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.commandline.CommandLogger;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.visor.compute.VisorComputeCancelSessionTask;
 import 
org.apache.ignite.internal.visor.compute.VisorComputeCancelSessionTaskArg;
+import org.apache.ignite.internal.visor.consistency.VisorConsistencyCancelTask;
 import org.apache.ignite.internal.visor.query.VisorContinuousQueryCancelTask;
 import 
org.apache.ignite.internal.visor.query.VisorContinuousQueryCancelTaskArg;
 import org.apache.ignite.internal.visor.query.VisorQueryCancelOnInitiatorTask;
@@ -51,6 +52,7 @@ import org.apache.ignite.mxbean.TransactionsMXBean;
 import static java.util.Collections.singletonMap;
 import static 
org.apache.ignite.internal.QueryMXBeanImpl.EXPECTED_GLOBAL_QRY_ID_FORMAT;
 import static org.apache.ignite.internal.commandline.CommandList.KILL;
+import static 
org.apache.ignite.internal.commandline.TaskExecutor.BROADCAST_UUID;
 import static 
org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode;
 import static 
org.apache.ignite.internal.commandline.query.KillSubcommand.COMPUTE;
 import static 
org.apache.ignite.internal.commandline.query.KillSubcommand.CONTINUOUS;
@@ -77,6 +79,9 @@ public class KillCommand extends AbstractCommand<Object> {
     /** Task name. */
     private String taskName;
 
+    /** Node id. */
+    private UUID nodeId;
+
     /** {@inheritDoc} */
     @Override public Object execute(GridClientConfiguration clientCfg, Logger 
log) throws Exception {
         try (GridClient client = Command.startClient(clientCfg)) {
@@ -84,7 +89,7 @@ public class KillCommand extends AbstractCommand<Object> {
                 client,
                 taskName,
                 taskArgs,
-                null,
+                nodeId,
                 clientCfg
             );
         }
@@ -119,6 +124,8 @@ public class KillCommand extends AbstractCommand<Object> {
 
                 taskName = VisorComputeCancelSessionTask.class.getName();
 
+                nodeId = null;
+
                 break;
 
             case SERVICE:
@@ -126,6 +133,8 @@ public class KillCommand extends AbstractCommand<Object> {
 
                 taskName = VisorCancelServiceTask.class.getName();
 
+                nodeId = null;
+
                 break;
 
             case TRANSACTION:
@@ -136,6 +145,8 @@ public class KillCommand extends AbstractCommand<Object> {
 
                 taskName = VisorTxTask.class.getName();
 
+                nodeId = null;
+
                 break;
 
             case SQL:
@@ -148,6 +159,8 @@ public class KillCommand extends AbstractCommand<Object> {
 
                 taskName = VisorQueryCancelOnInitiatorTask.class.getName();
 
+                nodeId = null;
+
                 break;
 
             case SCAN:
@@ -163,6 +176,8 @@ public class KillCommand extends AbstractCommand<Object> {
 
                 taskName = VisorScanQueryCancelTask.class.getName();
 
+                nodeId = null;
+
                 break;
 
             case CONTINUOUS:
@@ -172,6 +187,8 @@ public class KillCommand extends AbstractCommand<Object> {
 
                 taskName = VisorContinuousQueryCancelTask.class.getName();
 
+                nodeId = null;
+
                 break;
 
             case SNAPSHOT:
@@ -179,6 +196,17 @@ public class KillCommand extends AbstractCommand<Object> {
 
                 taskName = VisorSnapshotCancelTask.class.getName();
 
+                nodeId = null;
+
+                break;
+
+            case CONSISTENCY:
+                taskName = VisorConsistencyCancelTask.class.getName();
+
+                taskArgs = null;
+
+                nodeId = BROADCAST_UUID;
+
                 break;
 
             default:
diff --git 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillSubcommand.java
 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillSubcommand.java
index 311ab96..2c70166 100644
--- 
a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillSubcommand.java
+++ 
b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillSubcommand.java
@@ -54,4 +54,7 @@ public enum KillSubcommand {
 
     /** Kill snapshot operation. */
     SNAPSHOT,
+
+    /** Kill consistency tasks. */
+    CONSISTENCY,
 }
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
index ce84c9a..4358765 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java
@@ -73,6 +73,7 @@ import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXPERIMENTA
 import static 
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CHECKPOINT_FREQ;
 import static 
org.apache.ignite.configuration.EncryptionConfiguration.DFLT_REENCRYPTION_BATCH_SIZE;
 import static 
org.apache.ignite.configuration.EncryptionConfiguration.DFLT_REENCRYPTION_RATE_MBPS;
+import static org.apache.ignite.events.EventType.EVT_CONSISTENCY_VIOLATION;
 import static 
org.apache.ignite.internal.encryption.AbstractEncryptionTest.KEYSTORE_PASSWORD;
 import static 
org.apache.ignite.internal.encryption.AbstractEncryptionTest.KEYSTORE_PATH;
 import static 
org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsDumpTask.IDLE_DUMP_FILE_PREFIX;
@@ -250,6 +251,8 @@ public abstract class GridCommandHandlerAbstractTest 
extends GridCommonAbstractT
 
         cfg.setDaemon(igniteInstanceName.startsWith(DAEMON_NODE_NAME_PREFIX));
 
+        cfg.setIncludeEventTypes(EVT_CONSISTENCY_VIOLATION); // Extend if 
necessary.
+
         if (encryptionEnabled) {
             KeystoreEncryptionSpi encSpi = new KeystoreEncryptionSpi();
 
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
index 6643d9d..6002439 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java
@@ -42,6 +42,7 @@ import static 
org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.events.EventType.EVT_CONSISTENCY_VIOLATION;
 import static 
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static 
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
 import static 
org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask.CONSISTENCY_VIOLATIONS_FOUND;
 import static org.apache.ignite.testframework.GridTestUtils.assertContains;
 
@@ -171,11 +172,26 @@ public class GridCommandHandlerConsistencyTest extends 
GridCommandHandlerCluster
     /**
      *
      */
+    @Test
+    public void testRepairNonExistentCache() throws Exception {
+        startGrids(3);
+
+        injectTestSystemOut();
+
+        for (int i = 0; i < PARTITIONS; i++) {
+            assertEquals(EXIT_CODE_UNEXPECTED_ERROR, execute("--consistency", 
"repair", "non-existent", String.valueOf(i)));
+            assertContains(log, testOut.toString(), "Cache not found");
+        }
+    }
+
+    /**
+     *
+     */
     private void readRepairTx(AtomicInteger brokenParts, String cacheName) {
         for (int i = 0; i < PARTITIONS; i++) {
             assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", 
cacheName, String.valueOf(i)));
             assertContains(log, testOut.toString(), 
CONSISTENCY_VIOLATIONS_FOUND);
-            assertContains(log, testOut.toString(), "[found=1, fixed=1]");
+            assertContains(log, testOut.toString(), "[found=1, fixed=1");
 
             assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
 
@@ -196,7 +212,7 @@ public class GridCommandHandlerConsistencyTest extends 
GridCommandHandlerCluster
         for (int i = 0; i < PARTITIONS; i++) { // This may be a copy of 
previous (tx case), implement atomic repair to make this happen :)
             assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", 
cacheName, String.valueOf(i)));
             assertContains(log, testOut.toString(), 
CONSISTENCY_VIOLATIONS_FOUND);
-            assertContains(log, testOut.toString(), "[found=1, fixed=0]"); // 
Nothing fixed.
+            assertContains(log, testOut.toString(), "[found=1, fixed=0"); // 
Nothing fixed.
 
             assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify"));
             assertContains(log, testOut.toString(),
diff --git 
a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
 
b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
index 7f6f8d9..6501359 100644
--- 
a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
+++ 
b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java
@@ -20,15 +20,29 @@ package org.apache.ignite.util;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.systemview.view.ComputeJobView;
+import org.apache.ignite.spi.systemview.view.SystemView;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
 import static 
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static 
org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR;
 import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractSnapshotSelfTest.doSnapshotCancellationTest;
+import static 
org.apache.ignite.internal.processors.job.GridJobProcessor.JOBS_VIEW;
+import static org.apache.ignite.testframework.GridTestUtils.assertContains;
 import static org.apache.ignite.util.KillCommandsTests.PAGES_CNT;
 import static org.apache.ignite.util.KillCommandsTests.PAGE_SZ;
 import static org.apache.ignite.util.KillCommandsTests.doTestCancelComputeTask;
@@ -195,4 +209,97 @@ public class KillCommandsCommandShTest extends 
GridCommandHandlerClusterByClassA
 
         assertEquals(EXIT_CODE_OK, res);
     }
+
+    /** */
+    @Test
+    public void testCancelConsistencyMissedTask() {
+        int res = execute("--kill", "consistency");
+
+        assertEquals(EXIT_CODE_OK, res);
+    }
+
+    /** */
+    @Test
+    public void testCancelConsistencyTask() throws InterruptedException {
+        String consistencyCancheName = "consistencyCache";
+
+        CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>();
+
+        cfg.setName(consistencyCancheName);
+        cfg.setBackups(SERVER_NODE_CNT - 1);
+
+        IgniteCache<Integer, Integer> cache = client.getOrCreateCache(cfg);
+
+        for (int i = 0; i < 10_000; i++)
+            cache.put(i, i);
+
+        AtomicInteger getCnt = new AtomicInteger();
+
+        CountDownLatch thLatch = new CountDownLatch(1);
+
+        Thread th = new Thread(() -> {
+            IgnitePredicate<ComputeJobView> repairJobFilter =
+                job -> 
job.taskClassName().equals(VisorConsistencyRepairTask.class.getName());
+
+            for (IgniteEx node : srvs) {
+                SystemView<ComputeJobView> jobs = 
node.context().systemView().view(JOBS_VIEW);
+
+                assertTrue(F.iterator0(jobs, true, 
repairJobFilter).hasNext()); // Found.
+            }
+
+            int res = execute("--kill", "consistency");
+
+            assertEquals(EXIT_CODE_OK, res);
+
+            try {
+                assertTrue(GridTestUtils.waitForCondition(() -> {
+                    for (IgniteEx node : srvs) {
+                        SystemView<ComputeJobView> jobs = 
node.context().systemView().view(JOBS_VIEW);
+
+                        if (F.iterator0(jobs, true, 
repairJobFilter).hasNext()) // Found.
+                            return false;
+                    }
+
+                    return true;
+                }, 5000L)); // Missed.
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                fail();
+            }
+
+            thLatch.countDown();
+        });
+
+        for (IgniteEx server : srvs) {
+            TestRecordingCommunicationSpi spi =
+                
((TestRecordingCommunicationSpi)server.configuration().getCommunicationSpi());
+
+            spi.blockMessages((node, message) -> {
+                if (message instanceof GridNearGetRequest) { // Get request 
caused by read repair operation.
+                    if (getCnt.incrementAndGet() == SERVER_NODE_CNT) // Each 
node should send a get request.
+                        th.start();
+
+                    return true; // Blocking to freeze '--consistency repair' 
operation.
+                }
+
+                return false;
+            });
+        }
+
+        injectTestSystemOut();
+
+        assertEquals(EXIT_CODE_UNEXPECTED_ERROR, execute("--consistency", 
"repair", consistencyCancheName, "0"));
+
+        assertContains(log, testOut.toString(), "Operation execution 
cancelled.");
+        assertContains(log, testOut.toString(), "violations were NOT found 
[processed=0]");
+
+        thLatch.await();
+
+        for (IgniteEx server : srvs) { // Restoring messaging for other tests.
+            TestRecordingCommunicationSpi spi =
+                
((TestRecordingCommunicationSpi)server.configuration().getCommunicationSpi());
+
+            spi.stopBlock();
+        }
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index f78be69..fa8d670 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -744,8 +744,6 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
      */
     public void cancel(boolean sys) {
         try {
-            super.cancel();
-
             final ComputeJob job0 = job;
 
             if (sys)
@@ -764,6 +762,10 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
                 });
             }
 
+            // Interrupting only when all 'cancelled' flags are set.
+            // This allows the 'job' to determine it's a cancellation.
+            super.cancel();
+
             if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED))
                 recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0);
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyCancelTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyCancelTask.java
new file mode 100644
index 0000000..1287f8a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyCancelTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.consistency;
+
+import java.util.List;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.visor.VisorJob;
+import org.apache.ignite.internal.visor.VisorMultiNodeTask;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.systemview.view.ComputeJobView;
+
+import static 
org.apache.ignite.internal.processors.job.GridJobProcessor.JOBS_VIEW;
+
+/**
+ * Cancels given consistency repairs on all cluster nodes.
+ */
+public class VisorConsistencyCancelTask extends VisorMultiNodeTask<Void, Void, 
Void> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override protected VisorConsistencyCancelJob job(Void arg) {
+        return new VisorConsistencyCancelJob(arg, debug);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Void reduce0(List<ComputeJobResult> results) {
+        // No-op, just awaiting all jobs done.
+        return null;
+    }
+
+    /**
+     * Job that cancels the tasks.
+     */
+    private static class VisorConsistencyCancelJob extends VisorJob<Void, 
Void> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Auto-injected grid instance.
+         */
+        @IgniteInstanceResource
+        private transient IgniteEx ignite;
+
+        /**
+         * Default constructor.
+         */
+        protected VisorConsistencyCancelJob(Void arg, boolean debug) {
+            super(arg, debug);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Void run(Void arg) throws IgniteException {
+            F.iterator(ignite.context().systemView().view(JOBS_VIEW),
+                ComputeJobView::sessionId,
+                true,
+                job -> 
job.taskClassName().equals(VisorConsistencyRepairTask.class.getName())
+            ).forEach(sesId -> ignite.context().job().cancelJob(sesId, null, 
false));
+
+            return null;
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
index d8070f5..a43a6b0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java
@@ -31,11 +31,13 @@ import org.apache.ignite.compute.ComputeJobResult;
 import org.apache.ignite.events.CacheConsistencyViolationEvent;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteConsistencyViolationException;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.internal.visor.VisorMultiNodeTask;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -46,12 +48,13 @@ import static 
org.apache.ignite.events.EventType.EVT_CONSISTENCY_VIOLATION;
 /**
  *
  */
-public class VisorConsistencyRepairTask extends 
VisorMultiNodeTask<VisorConsistencyRepairTaskArg, String, String> {
+public class VisorConsistencyRepairTask extends
+    VisorMultiNodeTask<VisorConsistencyRepairTaskArg, 
VisorConsistencyRepairTaskResult, String> {
     /** Serial version uid. */
     private static final long serialVersionUID = 0L;
 
     /** Nothing found. */
-    private static final String NOTHING_FOUND = "Consistency violations were 
not found.";
+    private static final String NOTHING_FOUND = "Consistency violations were 
NOT found";
 
     /** Found. */
     public static final String CONSISTENCY_VIOLATIONS_FOUND = "Consistency 
violations were FOUND";
@@ -65,18 +68,34 @@ public class VisorConsistencyRepairTask extends 
VisorMultiNodeTask<VisorConsiste
     }
 
     /** {@inheritDoc} */
-    @Override protected String reduce0(List<ComputeJobResult> results) throws 
IgniteException {
+    @Override protected VisorConsistencyRepairTaskResult 
reduce0(List<ComputeJobResult> results) throws IgniteException {
+        VisorConsistencyRepairTaskResult taskRes = new 
VisorConsistencyRepairTaskResult();
         StringBuilder sb = new StringBuilder();
 
         for (ComputeJobResult res : results) {
+            if (res.isCancelled())
+                taskRes.cancelled(true);
+
+            Exception e = res.getException();
+
+            if (e != null) {
+                taskRes.failed(true);
+
+                sb.append("Node: ").append(res.getNode()).append("\n")
+                    .append("  Exception: ").append(e).append("\n")
+                    .append(X.getFullStackTrace(e)).append("\n");
+            }
+
             String data = res.getData();
 
             if (data != null)
                 sb.append("Node: ").append(res.getNode()).append("\n")
-                    .append("  Result: ").append(data).append("\n");
+                    .append("  Result: ").append(data).append("\n\n");
         }
 
-        return sb.toString();
+        taskRes.message(sb.toString());
+
+        return taskRes;
     }
 
     /**
@@ -107,7 +126,15 @@ public class VisorConsistencyRepairTask extends 
VisorMultiNodeTask<VisorConsiste
             int p = arg.part();
             int batchSize = 1024;
 
-            GridCacheContext<Object, Object> cctx = 
ignite.context().cache().cache(cacheName).context();
+            IgniteInternalCache<Object, Object> internalCache = 
ignite.context().cache().cache(cacheName);
+
+            if (internalCache == null)
+                if (ignite.context().cache().cacheDescriptor(cacheName) != 
null)
+                    return null; // Node filtered by node filter.
+                else
+                    throw new IgniteException("Cache not found [name=" + 
cacheName + "]");
+
+            GridCacheContext<Object, Object> cctx = internalCache.context();
 
             if (!cctx.gridEvents().isRecordable(EVT_CONSISTENCY_VIOLATION))
                 throw new UnsupportedOperationException("Consistency violation 
events recording is disabled on cluster.");
@@ -119,6 +146,8 @@ public class VisorConsistencyRepairTask extends 
VisorMultiNodeTask<VisorConsiste
             if (part == null)
                 return null; // Partition does not belong to the node.
 
+            long cnt = 0;
+
             part.reserve();
 
             try {
@@ -144,13 +173,15 @@ public class VisorConsistencyRepairTask extends 
VisorMultiNodeTask<VisorConsiste
 
                         try {
                             cache.getAll(keys); // Repair.
+
+                            cnt += keys.size();
                         }
                         catch (CacheException e) {
-                            if (!(e.getCause() instanceof 
IgniteConsistencyViolationException))
+                            if (!(e.getCause() instanceof 
IgniteConsistencyViolationException) && !isCancelled())
                                 throw new IgniteException("Read repair attempt 
failed.", e);
                         }
                     }
-                    while (!keys.isEmpty());
+                    while (!keys.isEmpty() && !isCancelled());
                 }
                 finally {
                     ignite.events().stopLocalListen(lsnr);
@@ -164,15 +195,15 @@ public class VisorConsistencyRepairTask extends 
VisorMultiNodeTask<VisorConsiste
             }
 
             if (!evts.isEmpty())
-                return processEvents(cctx, p);
+                return processEvents(cctx, p, cnt);
             else
-                return NOTHING_FOUND;
+                return NOTHING_FOUND + " [processed=" + cnt + "]";
         }
 
         /**
          *
          */
-        private String processEvents(GridCacheContext<Object, Object> cctx, 
int part) {
+        private String processEvents(GridCacheContext<Object, Object> cctx, 
int part, long cnt) {
             int found = 0;
             int fixed = 0;
 
@@ -211,10 +242,10 @@ public class VisorConsistencyRepairTask extends 
VisorMultiNodeTask<VisorConsiste
             if (!res.isEmpty()) {
                 log.warning(CONSISTENCY_VIOLATIONS_RECORDED + "\n" + res);
 
-                return CONSISTENCY_VIOLATIONS_FOUND + " [found=" + found + ", 
fixed=" + fixed + "]";
+                return CONSISTENCY_VIOLATIONS_FOUND + " [found=" + found + ", 
fixed=" + fixed + ", processed=" + cnt + "]";
             }
             else
-                return NOTHING_FOUND;
+                return NOTHING_FOUND + " [processed=" + cnt + "]";
         }
 
         /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskResult.java
new file mode 100644
index 0000000..51e6d64
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskResult.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.consistency;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class VisorConsistencyRepairTaskResult extends IgniteDataTransferObject 
{
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Result. */
+    private String msg;
+
+    /** Failed. */
+    private boolean failed;
+
+    /** Cancelled. */
+    private boolean cancelled;
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected void writeExternalData(ObjectOutput out) throws 
IOException {
+        U.writeString(out, msg);
+        out.writeBoolean(failed);
+        out.writeBoolean(cancelled);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override protected void readExternalData(byte protoVer,
+        ObjectInput in) throws IOException, ClassNotFoundException {
+        msg = U.readString(in);
+        failed = in.readBoolean();
+        cancelled = in.readBoolean();
+    }
+
+    /**
+     * @return Result.
+     */
+    public String message() {
+        return msg;
+    }
+
+    /**
+     * @param res New result.
+     */
+    public void message(String res) {
+        this.msg = res;
+    }
+
+    /**
+     * @return Failed.
+     */
+    public boolean failed() {
+        return failed;
+    }
+
+    /**
+     * @param failed Failed.
+     */
+    public void failed(boolean failed) {
+        this.failed = failed;
+    }
+
+    /**
+     * @return Cancelled.
+     */
+    public boolean cancelled() {
+        return cancelled;
+    }
+
+    /**
+     * @param cancelled New cancelled.
+     */
+    public void cancelled(boolean cancelled) {
+        this.cancelled = cancelled;
+    }
+}
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties 
b/modules/core/src/main/resources/META-INF/classnames.properties
index b787705..419c039 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2010,6 +2010,8 @@ 
org.apache.ignite.internal.visor.baseline.VisorBaselineTaskArg
 org.apache.ignite.internal.visor.baseline.VisorBaselineTaskResult
 org.apache.ignite.internal.visor.baseline.VisorBaselineViewTask
 
org.apache.ignite.internal.visor.baseline.VisorBaselineViewTask$VisorBaselineViewJob
+org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTaskArg
+org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTaskResult
 org.apache.ignite.internal.visor.misc.VisorIdAndTagViewTaskResult
 org.apache.ignite.internal.visor.misc.VisorClusterChangeTagTaskResult
 org.apache.ignite.internal.visor.shutdown.VisorShutdownPolicyTask

Reply via email to