This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new baf2c91 IGNITE-15328 Consistency recovery command (Read Repair via control.ch) should support cancellation (#9409) baf2c91 is described below commit baf2c9143d3b0e40e2440baf6b4f99f4bd3041d0 Author: Anton Vinogradov <a...@apache.org> AuthorDate: Mon Sep 20 12:09:36 2021 +0300 IGNITE-15328 Consistency recovery command (Read Repair via control.ch) should support cancellation (#9409) --- docs/_docs/sql-reference/operational-commands.adoc | 24 ++++- .../consistency/ConsistencyCommand.java | 34 ++++++- .../internal/commandline/query/KillCommand.java | 30 +++++- .../internal/commandline/query/KillSubcommand.java | 3 + .../util/GridCommandHandlerAbstractTest.java | 3 + .../util/GridCommandHandlerConsistencyTest.java | 20 +++- .../ignite/util/KillCommandsCommandShTest.java | 107 +++++++++++++++++++++ .../internal/processors/job/GridJobWorker.java | 6 +- .../consistency/VisorConsistencyCancelTask.java | 81 ++++++++++++++++ .../consistency/VisorConsistencyRepairTask.java | 57 ++++++++--- .../VisorConsistencyRepairTaskResult.java | 102 ++++++++++++++++++++ .../main/resources/META-INF/classnames.properties | 2 + 12 files changed, 447 insertions(+), 22 deletions(-) diff --git a/docs/_docs/sql-reference/operational-commands.adoc b/docs/_docs/sql-reference/operational-commands.adoc index be7223f..d1f4641 100644 --- a/docs/_docs/sql-reference/operational-commands.adoc +++ b/docs/_docs/sql-reference/operational-commands.adoc @@ -334,7 +334,7 @@ KILL CONTINUOUS '6fa749ee-7cf8-4635-be10-36a1c75267a7_54321' '6fa749ee-7cf8-4635 == KILL SERVICE -The `KILL SERVICE` command allows you to cance a running service. +The `KILL SERVICE` command allows you to cancel a running service. [tabs] -- @@ -370,3 +370,25 @@ control.bat --kill SERVICE name * `name` - corresponds to the name you selected for the service upon the deployment time. You can always find it with the link:monitoring-metrics/system-views#services[SERVICES] view. + + +== KILL CONSISTENCY repair/check operations + +The `KILL CONSISTENCY` command allows you to cancel all running consistency repair/check operations. + +[tabs] +-- + +tab:Unix[] +[source,bash] +---- +./control.sh --kill CONSISTENCY +---- + +tab:Windows[] +[source,bash] +---- +control.bat --kill CONSISTENCY +---- + +-- diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/consistency/ConsistencyCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/consistency/ConsistencyCommand.java index a1f56c4..10ef23a 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/consistency/ConsistencyCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/consistency/ConsistencyCommand.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.commandline.consistency; import java.util.LinkedHashMap; import java.util.Map; import java.util.logging.Logger; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.client.GridClient; import org.apache.ignite.internal.client.GridClientConfiguration; import org.apache.ignite.internal.commandline.AbstractCommand; @@ -27,6 +28,7 @@ import org.apache.ignite.internal.commandline.Command; import org.apache.ignite.internal.commandline.CommandArgIterator; import org.apache.ignite.internal.commandline.CommandLogger; import org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTaskArg; +import org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTaskResult; import static org.apache.ignite.internal.commandline.CommandList.CONSISTENCY; import static org.apache.ignite.internal.commandline.TaskExecutor.BROADCAST_UUID; @@ -46,8 +48,12 @@ public class ConsistencyCommand extends AbstractCommand<VisorConsistencyRepairTa /** {@inheritDoc} */ @Override public Object execute(GridClientConfiguration clientCfg, Logger log) throws Exception { + boolean failed = false; + + StringBuilder sb = new StringBuilder(); + try (GridClient client = Command.startClient(clientCfg)) { - Object res = executeTaskByNameOnNode( + VisorConsistencyRepairTaskResult res = executeTaskByNameOnNode( client, cmd.taskName(), arg(), @@ -55,9 +61,22 @@ public class ConsistencyCommand extends AbstractCommand<VisorConsistencyRepairTa clientCfg ); - log.info(String.valueOf(res)); + if (res.cancelled()) { + sb.append("Operation execution cancelled.\n\n"); + + failed = true; + } + + if (res.failed()) { + sb.append("Operation execution failed.\n\n"); + + failed = true; + } - return res; + if (failed) + sb.append("[EXECUTION FAILED OR CANCELLED, RESULTS MAY BE INCOMPLETE OR INCONSISTENT]\n\n"); + + sb.append(res.message()); } catch (Throwable e) { log.severe("Failed to perform operation."); @@ -65,6 +84,15 @@ public class ConsistencyCommand extends AbstractCommand<VisorConsistencyRepairTa throw e; } + + String output = sb.toString(); + + if (failed) + throw new IgniteCheckedException(output); + else + log.info(output); + + return output; } /** {@inheritDoc} */ diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillCommand.java index 7682113..12926aa 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillCommand.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.commandline.CommandLogger; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.visor.compute.VisorComputeCancelSessionTask; import org.apache.ignite.internal.visor.compute.VisorComputeCancelSessionTaskArg; +import org.apache.ignite.internal.visor.consistency.VisorConsistencyCancelTask; import org.apache.ignite.internal.visor.query.VisorContinuousQueryCancelTask; import org.apache.ignite.internal.visor.query.VisorContinuousQueryCancelTaskArg; import org.apache.ignite.internal.visor.query.VisorQueryCancelOnInitiatorTask; @@ -51,6 +52,7 @@ import org.apache.ignite.mxbean.TransactionsMXBean; import static java.util.Collections.singletonMap; import static org.apache.ignite.internal.QueryMXBeanImpl.EXPECTED_GLOBAL_QRY_ID_FORMAT; import static org.apache.ignite.internal.commandline.CommandList.KILL; +import static org.apache.ignite.internal.commandline.TaskExecutor.BROADCAST_UUID; import static org.apache.ignite.internal.commandline.TaskExecutor.executeTaskByNameOnNode; import static org.apache.ignite.internal.commandline.query.KillSubcommand.COMPUTE; import static org.apache.ignite.internal.commandline.query.KillSubcommand.CONTINUOUS; @@ -77,6 +79,9 @@ public class KillCommand extends AbstractCommand<Object> { /** Task name. */ private String taskName; + /** Node id. */ + private UUID nodeId; + /** {@inheritDoc} */ @Override public Object execute(GridClientConfiguration clientCfg, Logger log) throws Exception { try (GridClient client = Command.startClient(clientCfg)) { @@ -84,7 +89,7 @@ public class KillCommand extends AbstractCommand<Object> { client, taskName, taskArgs, - null, + nodeId, clientCfg ); } @@ -119,6 +124,8 @@ public class KillCommand extends AbstractCommand<Object> { taskName = VisorComputeCancelSessionTask.class.getName(); + nodeId = null; + break; case SERVICE: @@ -126,6 +133,8 @@ public class KillCommand extends AbstractCommand<Object> { taskName = VisorCancelServiceTask.class.getName(); + nodeId = null; + break; case TRANSACTION: @@ -136,6 +145,8 @@ public class KillCommand extends AbstractCommand<Object> { taskName = VisorTxTask.class.getName(); + nodeId = null; + break; case SQL: @@ -148,6 +159,8 @@ public class KillCommand extends AbstractCommand<Object> { taskName = VisorQueryCancelOnInitiatorTask.class.getName(); + nodeId = null; + break; case SCAN: @@ -163,6 +176,8 @@ public class KillCommand extends AbstractCommand<Object> { taskName = VisorScanQueryCancelTask.class.getName(); + nodeId = null; + break; case CONTINUOUS: @@ -172,6 +187,8 @@ public class KillCommand extends AbstractCommand<Object> { taskName = VisorContinuousQueryCancelTask.class.getName(); + nodeId = null; + break; case SNAPSHOT: @@ -179,6 +196,17 @@ public class KillCommand extends AbstractCommand<Object> { taskName = VisorSnapshotCancelTask.class.getName(); + nodeId = null; + + break; + + case CONSISTENCY: + taskName = VisorConsistencyCancelTask.class.getName(); + + taskArgs = null; + + nodeId = BROADCAST_UUID; + break; default: diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillSubcommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillSubcommand.java index 311ab96..2c70166 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillSubcommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/query/KillSubcommand.java @@ -54,4 +54,7 @@ public enum KillSubcommand { /** Kill snapshot operation. */ SNAPSHOT, + + /** Kill consistency tasks. */ + CONSISTENCY, } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java index ce84c9a..4358765 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerAbstractTest.java @@ -73,6 +73,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXPERIMENTA import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CHECKPOINT_FREQ; import static org.apache.ignite.configuration.EncryptionConfiguration.DFLT_REENCRYPTION_BATCH_SIZE; import static org.apache.ignite.configuration.EncryptionConfiguration.DFLT_REENCRYPTION_RATE_MBPS; +import static org.apache.ignite.events.EventType.EVT_CONSISTENCY_VIOLATION; import static org.apache.ignite.internal.encryption.AbstractEncryptionTest.KEYSTORE_PASSWORD; import static org.apache.ignite.internal.encryption.AbstractEncryptionTest.KEYSTORE_PATH; import static org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsDumpTask.IDLE_DUMP_FILE_PREFIX; @@ -250,6 +251,8 @@ public abstract class GridCommandHandlerAbstractTest extends GridCommonAbstractT cfg.setDaemon(igniteInstanceName.startsWith(DAEMON_NODE_NAME_PREFIX)); + cfg.setIncludeEventTypes(EVT_CONSISTENCY_VIOLATION); // Extend if necessary. + if (encryptionEnabled) { KeystoreEncryptionSpi encSpi = new KeystoreEncryptionSpi(); diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java index 6643d9d..6002439 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerConsistencyTest.java @@ -42,6 +42,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.events.EventType.EVT_CONSISTENCY_VIOLATION; import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR; import static org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask.CONSISTENCY_VIOLATIONS_FOUND; import static org.apache.ignite.testframework.GridTestUtils.assertContains; @@ -171,11 +172,26 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster /** * */ + @Test + public void testRepairNonExistentCache() throws Exception { + startGrids(3); + + injectTestSystemOut(); + + for (int i = 0; i < PARTITIONS; i++) { + assertEquals(EXIT_CODE_UNEXPECTED_ERROR, execute("--consistency", "repair", "non-existent", String.valueOf(i))); + assertContains(log, testOut.toString(), "Cache not found"); + } + } + + /** + * + */ private void readRepairTx(AtomicInteger brokenParts, String cacheName) { for (int i = 0; i < PARTITIONS; i++) { assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i))); assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND); - assertContains(log, testOut.toString(), "[found=1, fixed=1]"); + assertContains(log, testOut.toString(), "[found=1, fixed=1"); assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify")); @@ -196,7 +212,7 @@ public class GridCommandHandlerConsistencyTest extends GridCommandHandlerCluster for (int i = 0; i < PARTITIONS; i++) { // This may be a copy of previous (tx case), implement atomic repair to make this happen :) assertEquals(EXIT_CODE_OK, execute("--consistency", "repair", cacheName, String.valueOf(i))); assertContains(log, testOut.toString(), CONSISTENCY_VIOLATIONS_FOUND); - assertContains(log, testOut.toString(), "[found=1, fixed=0]"); // Nothing fixed. + assertContains(log, testOut.toString(), "[found=1, fixed=0"); // Nothing fixed. assertEquals(EXIT_CODE_OK, execute("--cache", "idle_verify")); assertContains(log, testOut.toString(), diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java index 7f6f8d9..6501359 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/KillCommandsCommandShTest.java @@ -20,15 +20,29 @@ package org.apache.ignite.util; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTask; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.systemview.view.ComputeJobView; +import org.apache.ignite.spi.systemview.view.SystemView; +import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.AbstractSnapshotSelfTest.doSnapshotCancellationTest; +import static org.apache.ignite.internal.processors.job.GridJobProcessor.JOBS_VIEW; +import static org.apache.ignite.testframework.GridTestUtils.assertContains; import static org.apache.ignite.util.KillCommandsTests.PAGES_CNT; import static org.apache.ignite.util.KillCommandsTests.PAGE_SZ; import static org.apache.ignite.util.KillCommandsTests.doTestCancelComputeTask; @@ -195,4 +209,97 @@ public class KillCommandsCommandShTest extends GridCommandHandlerClusterByClassA assertEquals(EXIT_CODE_OK, res); } + + /** */ + @Test + public void testCancelConsistencyMissedTask() { + int res = execute("--kill", "consistency"); + + assertEquals(EXIT_CODE_OK, res); + } + + /** */ + @Test + public void testCancelConsistencyTask() throws InterruptedException { + String consistencyCancheName = "consistencyCache"; + + CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>(); + + cfg.setName(consistencyCancheName); + cfg.setBackups(SERVER_NODE_CNT - 1); + + IgniteCache<Integer, Integer> cache = client.getOrCreateCache(cfg); + + for (int i = 0; i < 10_000; i++) + cache.put(i, i); + + AtomicInteger getCnt = new AtomicInteger(); + + CountDownLatch thLatch = new CountDownLatch(1); + + Thread th = new Thread(() -> { + IgnitePredicate<ComputeJobView> repairJobFilter = + job -> job.taskClassName().equals(VisorConsistencyRepairTask.class.getName()); + + for (IgniteEx node : srvs) { + SystemView<ComputeJobView> jobs = node.context().systemView().view(JOBS_VIEW); + + assertTrue(F.iterator0(jobs, true, repairJobFilter).hasNext()); // Found. + } + + int res = execute("--kill", "consistency"); + + assertEquals(EXIT_CODE_OK, res); + + try { + assertTrue(GridTestUtils.waitForCondition(() -> { + for (IgniteEx node : srvs) { + SystemView<ComputeJobView> jobs = node.context().systemView().view(JOBS_VIEW); + + if (F.iterator0(jobs, true, repairJobFilter).hasNext()) // Found. + return false; + } + + return true; + }, 5000L)); // Missed. + } + catch (IgniteInterruptedCheckedException e) { + fail(); + } + + thLatch.countDown(); + }); + + for (IgniteEx server : srvs) { + TestRecordingCommunicationSpi spi = + ((TestRecordingCommunicationSpi)server.configuration().getCommunicationSpi()); + + spi.blockMessages((node, message) -> { + if (message instanceof GridNearGetRequest) { // Get request caused by read repair operation. + if (getCnt.incrementAndGet() == SERVER_NODE_CNT) // Each node should send a get request. + th.start(); + + return true; // Blocking to freeze '--consistency repair' operation. + } + + return false; + }); + } + + injectTestSystemOut(); + + assertEquals(EXIT_CODE_UNEXPECTED_ERROR, execute("--consistency", "repair", consistencyCancheName, "0")); + + assertContains(log, testOut.toString(), "Operation execution cancelled."); + assertContains(log, testOut.toString(), "violations were NOT found [processed=0]"); + + thLatch.await(); + + for (IgniteEx server : srvs) { // Restoring messaging for other tests. + TestRecordingCommunicationSpi spi = + ((TestRecordingCommunicationSpi)server.configuration().getCommunicationSpi()); + + spi.stopBlock(); + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index f78be69..fa8d670 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -744,8 +744,6 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { */ public void cancel(boolean sys) { try { - super.cancel(); - final ComputeJob job0 = job; if (sys) @@ -764,6 +762,10 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { }); } + // Interrupting only when all 'cancelled' flags are set. + // This allows the 'job' to determine it's a cancellation. + super.cancel(); + if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED)) recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyCancelTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyCancelTask.java new file mode 100644 index 0000000..1287f8a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyCancelTask.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.consistency; + +import java.util.List; +import org.apache.ignite.IgniteException; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorMultiNodeTask; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.systemview.view.ComputeJobView; + +import static org.apache.ignite.internal.processors.job.GridJobProcessor.JOBS_VIEW; + +/** + * Cancels given consistency repairs on all cluster nodes. + */ +public class VisorConsistencyCancelTask extends VisorMultiNodeTask<Void, Void, Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorConsistencyCancelJob job(Void arg) { + return new VisorConsistencyCancelJob(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected Void reduce0(List<ComputeJobResult> results) { + // No-op, just awaiting all jobs done. + return null; + } + + /** + * Job that cancels the tasks. + */ + private static class VisorConsistencyCancelJob extends VisorJob<Void, Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Auto-injected grid instance. + */ + @IgniteInstanceResource + private transient IgniteEx ignite; + + /** + * Default constructor. + */ + protected VisorConsistencyCancelJob(Void arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected Void run(Void arg) throws IgniteException { + F.iterator(ignite.context().systemView().view(JOBS_VIEW), + ComputeJobView::sessionId, + true, + job -> job.taskClassName().equals(VisorConsistencyRepairTask.class.getName()) + ).forEach(sesId -> ignite.context().job().cancelJob(sesId, null, false)); + + return null; + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java index d8070f5..a43a6b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTask.java @@ -31,11 +31,13 @@ import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.events.CacheConsistencyViolationEvent; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteConsistencyViolationException; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorMultiNodeTask; import org.apache.ignite.lang.IgnitePredicate; @@ -46,12 +48,13 @@ import static org.apache.ignite.events.EventType.EVT_CONSISTENCY_VIOLATION; /** * */ -public class VisorConsistencyRepairTask extends VisorMultiNodeTask<VisorConsistencyRepairTaskArg, String, String> { +public class VisorConsistencyRepairTask extends + VisorMultiNodeTask<VisorConsistencyRepairTaskArg, VisorConsistencyRepairTaskResult, String> { /** Serial version uid. */ private static final long serialVersionUID = 0L; /** Nothing found. */ - private static final String NOTHING_FOUND = "Consistency violations were not found."; + private static final String NOTHING_FOUND = "Consistency violations were NOT found"; /** Found. */ public static final String CONSISTENCY_VIOLATIONS_FOUND = "Consistency violations were FOUND"; @@ -65,18 +68,34 @@ public class VisorConsistencyRepairTask extends VisorMultiNodeTask<VisorConsiste } /** {@inheritDoc} */ - @Override protected String reduce0(List<ComputeJobResult> results) throws IgniteException { + @Override protected VisorConsistencyRepairTaskResult reduce0(List<ComputeJobResult> results) throws IgniteException { + VisorConsistencyRepairTaskResult taskRes = new VisorConsistencyRepairTaskResult(); StringBuilder sb = new StringBuilder(); for (ComputeJobResult res : results) { + if (res.isCancelled()) + taskRes.cancelled(true); + + Exception e = res.getException(); + + if (e != null) { + taskRes.failed(true); + + sb.append("Node: ").append(res.getNode()).append("\n") + .append(" Exception: ").append(e).append("\n") + .append(X.getFullStackTrace(e)).append("\n"); + } + String data = res.getData(); if (data != null) sb.append("Node: ").append(res.getNode()).append("\n") - .append(" Result: ").append(data).append("\n"); + .append(" Result: ").append(data).append("\n\n"); } - return sb.toString(); + taskRes.message(sb.toString()); + + return taskRes; } /** @@ -107,7 +126,15 @@ public class VisorConsistencyRepairTask extends VisorMultiNodeTask<VisorConsiste int p = arg.part(); int batchSize = 1024; - GridCacheContext<Object, Object> cctx = ignite.context().cache().cache(cacheName).context(); + IgniteInternalCache<Object, Object> internalCache = ignite.context().cache().cache(cacheName); + + if (internalCache == null) + if (ignite.context().cache().cacheDescriptor(cacheName) != null) + return null; // Node filtered by node filter. + else + throw new IgniteException("Cache not found [name=" + cacheName + "]"); + + GridCacheContext<Object, Object> cctx = internalCache.context(); if (!cctx.gridEvents().isRecordable(EVT_CONSISTENCY_VIOLATION)) throw new UnsupportedOperationException("Consistency violation events recording is disabled on cluster."); @@ -119,6 +146,8 @@ public class VisorConsistencyRepairTask extends VisorMultiNodeTask<VisorConsiste if (part == null) return null; // Partition does not belong to the node. + long cnt = 0; + part.reserve(); try { @@ -144,13 +173,15 @@ public class VisorConsistencyRepairTask extends VisorMultiNodeTask<VisorConsiste try { cache.getAll(keys); // Repair. + + cnt += keys.size(); } catch (CacheException e) { - if (!(e.getCause() instanceof IgniteConsistencyViolationException)) + if (!(e.getCause() instanceof IgniteConsistencyViolationException) && !isCancelled()) throw new IgniteException("Read repair attempt failed.", e); } } - while (!keys.isEmpty()); + while (!keys.isEmpty() && !isCancelled()); } finally { ignite.events().stopLocalListen(lsnr); @@ -164,15 +195,15 @@ public class VisorConsistencyRepairTask extends VisorMultiNodeTask<VisorConsiste } if (!evts.isEmpty()) - return processEvents(cctx, p); + return processEvents(cctx, p, cnt); else - return NOTHING_FOUND; + return NOTHING_FOUND + " [processed=" + cnt + "]"; } /** * */ - private String processEvents(GridCacheContext<Object, Object> cctx, int part) { + private String processEvents(GridCacheContext<Object, Object> cctx, int part, long cnt) { int found = 0; int fixed = 0; @@ -211,10 +242,10 @@ public class VisorConsistencyRepairTask extends VisorMultiNodeTask<VisorConsiste if (!res.isEmpty()) { log.warning(CONSISTENCY_VIOLATIONS_RECORDED + "\n" + res); - return CONSISTENCY_VIOLATIONS_FOUND + " [found=" + found + ", fixed=" + fixed + "]"; + return CONSISTENCY_VIOLATIONS_FOUND + " [found=" + found + ", fixed=" + fixed + ", processed=" + cnt + "]"; } else - return NOTHING_FOUND; + return NOTHING_FOUND + " [processed=" + cnt + "]"; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskResult.java new file mode 100644 index 0000000..51e6d64 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/consistency/VisorConsistencyRepairTaskResult.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.consistency; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.dto.IgniteDataTransferObject; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * + */ +public class VisorConsistencyRepairTaskResult extends IgniteDataTransferObject { + /** */ + private static final long serialVersionUID = 0L; + + /** Result. */ + private String msg; + + /** Failed. */ + private boolean failed; + + /** Cancelled. */ + private boolean cancelled; + + /** + * {@inheritDoc} + */ + @Override protected void writeExternalData(ObjectOutput out) throws IOException { + U.writeString(out, msg); + out.writeBoolean(failed); + out.writeBoolean(cancelled); + } + + /** + * {@inheritDoc} + */ + @Override protected void readExternalData(byte protoVer, + ObjectInput in) throws IOException, ClassNotFoundException { + msg = U.readString(in); + failed = in.readBoolean(); + cancelled = in.readBoolean(); + } + + /** + * @return Result. + */ + public String message() { + return msg; + } + + /** + * @param res New result. + */ + public void message(String res) { + this.msg = res; + } + + /** + * @return Failed. + */ + public boolean failed() { + return failed; + } + + /** + * @param failed Failed. + */ + public void failed(boolean failed) { + this.failed = failed; + } + + /** + * @return Cancelled. + */ + public boolean cancelled() { + return cancelled; + } + + /** + * @param cancelled New cancelled. + */ + public void cancelled(boolean cancelled) { + this.cancelled = cancelled; + } +} diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index b787705..419c039 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -2010,6 +2010,8 @@ org.apache.ignite.internal.visor.baseline.VisorBaselineTaskArg org.apache.ignite.internal.visor.baseline.VisorBaselineTaskResult org.apache.ignite.internal.visor.baseline.VisorBaselineViewTask org.apache.ignite.internal.visor.baseline.VisorBaselineViewTask$VisorBaselineViewJob +org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTaskArg +org.apache.ignite.internal.visor.consistency.VisorConsistencyRepairTaskResult org.apache.ignite.internal.visor.misc.VisorIdAndTagViewTaskResult org.apache.ignite.internal.visor.misc.VisorClusterChangeTagTaskResult org.apache.ignite.internal.visor.shutdown.VisorShutdownPolicyTask