This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 1728da3 Consolidate node liveness check for forced repair 1728da3 is described below commit 1728da30e4e7858d30178ef74350af3e690adf0c Author: yifan-c <yc25c...@gmail.com> AuthorDate: Tue Sep 8 18:23:30 2020 -0700 Consolidate node liveness check for forced repair Patch by Yifan Cai; Reviewed by Blake Eggleston for CASSANDRA-16113 --- CHANGES.txt | 1 + .../org/apache/cassandra/repair/CommonRange.java | 21 ++-- .../apache/cassandra/repair/RepairRunnable.java | 121 +++++++++++---------- .../org/apache/cassandra/repair/RepairSession.java | 38 +------ .../cassandra/service/ActiveRepairService.java | 3 +- .../distributed/test/DistributedRepairUtils.java | 33 ++++-- .../cassandra/distributed/test/RepairTest.java | 92 +++++++++++----- ...nnableTest.java => NeighborsAndRangesTest.java} | 25 +++-- .../org/apache/cassandra/repair/RepairJobTest.java | 7 +- .../apache/cassandra/repair/RepairSessionTest.java | 2 +- 10 files changed, 185 insertions(+), 158 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 289d4e8..412b336 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-beta3 + * Consolidate node liveness check for forced repair (CASSANDRA-16113) * Use unsigned short in ValueAccessor.sliceWithShortLength (CASSANDRA-16147) * Abort repairs when getting a truncation request (CASSANDRA-15854) * Remove bad assert when getting active compactions for an sstable (CASSANDRA-15457) diff --git a/src/java/org/apache/cassandra/repair/CommonRange.java b/src/java/org/apache/cassandra/repair/CommonRange.java index dab77c5..5eb8061 100644 --- a/src/java/org/apache/cassandra/repair/CommonRange.java +++ b/src/java/org/apache/cassandra/repair/CommonRange.java @@ -21,6 +21,7 @@ package org.apache.cassandra.repair; import java.util.ArrayList; import java.util.Collection; +import java.util.Objects; import java.util.Set; import com.google.common.base.Preconditions; @@ -38,9 +39,15 @@ public class CommonRange public final ImmutableSet<InetAddressAndPort> endpoints; public final ImmutableSet<InetAddressAndPort> transEndpoints; public final Collection<Range<Token>> ranges; + public final boolean hasSkippedReplicas; public CommonRange(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints, Collection<Range<Token>> ranges) { + this(endpoints, transEndpoints, ranges, false); + } + + public CommonRange(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints, Collection<Range<Token>> ranges, boolean hasSkippedReplicas) + { Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty(), "Endpoints can not be empty"); Preconditions.checkArgument(transEndpoints != null, "Transient endpoints can not be null"); Preconditions.checkArgument(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints"); @@ -49,6 +56,7 @@ public class CommonRange this.endpoints = ImmutableSet.copyOf(endpoints); this.transEndpoints = ImmutableSet.copyOf(transEndpoints); this.ranges = new ArrayList<>(ranges); + this.hasSkippedReplicas = hasSkippedReplicas; } public boolean matchesEndpoints(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints) @@ -64,17 +72,15 @@ public class CommonRange CommonRange that = (CommonRange) o; - if (!endpoints.equals(that.endpoints)) return false; - if (!transEndpoints.equals(that.transEndpoints)) return false; - return ranges.equals(that.ranges); + return Objects.equals(endpoints, that.endpoints) + && Objects.equals(transEndpoints, that.transEndpoints) + && Objects.equals(ranges, that.ranges) + && hasSkippedReplicas == that.hasSkippedReplicas; } public int hashCode() { - int result = endpoints.hashCode(); - result = 31 * result + transEndpoints.hashCode(); - result = 31 * result + ranges.hashCode(); - return result; + return Objects.hash(endpoints, transEndpoints, ranges, hasSkippedReplicas); } public String toString() @@ -83,6 +89,7 @@ public class CommonRange "endpoints=" + endpoints + ", transEndpoints=" + transEndpoints + ", ranges=" + ranges + + ", hasSkippedReplicas=" + hasSkippedReplicas + '}'; } } diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index f6aa6d1..5d8e945 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -20,6 +20,7 @@ package org.apache.cassandra.repair; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -32,7 +33,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -260,7 +260,7 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier maybeStoreParentRepairStart(cfnames); - prepare(columnFamilies, neighborsAndRanges.allNeighbors, neighborsAndRanges.force); + prepare(columnFamilies, neighborsAndRanges.participants, neighborsAndRanges.shouldExcludeDeadParticipants); repair(cfnames, neighborsAndRanges); } @@ -345,15 +345,15 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier progressCounter.incrementAndGet(); - boolean force = options.isForcedRepair(); + boolean shouldExcludeDeadParticipants = options.isForcedRepair(); - if (force && options.isIncremental()) + if (shouldExcludeDeadParticipants) { Set<InetAddressAndPort> actualNeighbors = Sets.newHashSet(Iterables.filter(allNeighbors, FailureDetector.instance::isAlive)); - force = !allNeighbors.equals(actualNeighbors); + shouldExcludeDeadParticipants = !allNeighbors.equals(actualNeighbors); allNeighbors = actualNeighbors; } - return new NeighborsAndRanges(force, allNeighbors, commonRanges); + return new NeighborsAndRanges(shouldExcludeDeadParticipants, allNeighbors, commonRanges); } private void maybeStoreParentRepairStart(String[] cfnames) @@ -393,16 +393,15 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier { if (options.isPreview()) { - previewRepair(parentSession, creationTimeMillis, neighborsAndRanges.commonRanges, cfnames); + previewRepair(parentSession, creationTimeMillis, neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames); } else if (options.isIncremental()) { - incrementalRepair(parentSession, creationTimeMillis, neighborsAndRanges.force, traceState, - neighborsAndRanges.allNeighbors, neighborsAndRanges.commonRanges, cfnames); + incrementalRepair(parentSession, creationTimeMillis, traceState, neighborsAndRanges, cfnames); } else { - normalRepair(parentSession, creationTimeMillis, traceState, neighborsAndRanges.commonRanges, cfnames); + normalRepair(parentSession, creationTimeMillis, traceState, neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames); } } @@ -428,10 +427,10 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier @SuppressWarnings("unchecked") public ListenableFuture apply(List<RepairSessionResult> results) { + logger.debug("Repair result: {}", results); // filter out null(=failed) results and get successful ranges for (RepairSessionResult sessionResult : results) { - logger.debug("Repair result: {}", results); if (sessionResult != null) { // don't record successful repair if we had to skip ranges @@ -451,54 +450,21 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, hasFailure, executor), MoreExecutors.directExecutor()); } - /** - * removes dead nodes from common ranges, and exludes ranges left without any participants - */ - @VisibleForTesting - static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddressAndPort> liveEndpoints, boolean force) - { - if (!force) - { - return commonRanges; - } - else - { - List<CommonRange> filtered = new ArrayList<>(commonRanges.size()); - - for (CommonRange commonRange : commonRanges) - { - Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains)); - Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, liveEndpoints::contains)); - Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints"); - - // this node is implicitly a participant in this repair, so a single endpoint is ok here - if (!endpoints.isEmpty()) - { - filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges)); - } - } - Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair"); - return filtered; - } - } - private void incrementalRepair(UUID parentSession, long startTime, - boolean forceRepair, TraceState traceState, - Set<InetAddressAndPort> allNeighbors, - List<CommonRange> commonRanges, + NeighborsAndRanges neighborsAndRanges, String... cfnames) { // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder() - .addAll(allNeighbors) + .addAll(neighborsAndRanges.participants) .add(FBUtilities.getBroadcastAddressAndPort()) .build(); + // Not necessary to include self for filtering. The common ranges only contains neighbhor node endpoints. + List<CommonRange> allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames); - List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair); - - CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, forceRepair); + CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants); ListeningExecutorService executor = createExecutor(); AtomicBoolean hasFailure = new AtomicBoolean(false); ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, allRanges, cfnames), @@ -640,9 +606,6 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier { List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size()); - // we do endpoint filtering at the start of an incremental repair, - // so repair sessions shouldn't also be checking liveness - boolean force = options.isForcedRepair() && !isIncremental; for (CommonRange commonRange : commonRanges) { logger.info("Starting RepairSession for {}", commonRange); @@ -652,7 +615,6 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier options.getParallelism(), isIncremental, options.isPullRepair(), - force, options.getPreviewKind(), options.optimiseStreams(), executor, @@ -851,17 +813,58 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier } } - private static final class NeighborsAndRanges + static final class NeighborsAndRanges { - private final boolean force; - private final Set<InetAddressAndPort> allNeighbors; + private final boolean shouldExcludeDeadParticipants; + private final Set<InetAddressAndPort> participants; private final List<CommonRange> commonRanges; - private NeighborsAndRanges(boolean force, Set<InetAddressAndPort> allNeighbors, List<CommonRange> commonRanges) + NeighborsAndRanges(boolean shouldExcludeDeadParticipants, Set<InetAddressAndPort> participants, List<CommonRange> commonRanges) { - this.force = force; - this.allNeighbors = allNeighbors; + this.shouldExcludeDeadParticipants = shouldExcludeDeadParticipants; + this.participants = participants; this.commonRanges = commonRanges; } + + /** + * When in the force mode, removes dead nodes from common ranges (not contained within `allNeighbors`), + * and exludes ranges left without any participants + * When not in the force mode, no-op. + */ + List<CommonRange> filterCommonRanges(String keyspace, String[] tableNames) + { + if (!shouldExcludeDeadParticipants) + { + return commonRanges; + } + else + { + logger.debug("force flag set, removing dead endpoints if possible"); + + List<CommonRange> filtered = new ArrayList<>(commonRanges.size()); + + for (CommonRange commonRange : commonRanges) + { + Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, participants::contains)); + Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, participants::contains)); + Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints"); + + // this node is implicitly a participant in this repair, so a single endpoint is ok here + if (!endpoints.isEmpty()) + { + Set<InetAddressAndPort> skippedReplicas = Sets.difference(commonRange.endpoints, endpoints); + skippedReplicas.forEach(endpoint -> logger.info("Removing a dead node {} from repair for ranges {} due to -force", endpoint, commonRange.ranges)); + filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges, !skippedReplicas.isEmpty())); + } + else + { + logger.warn("Skipping forced repair for ranges {} of tables {} in keyspace {}, as no neighbor nodes are live.", + commonRange.ranges, Arrays.asList(tableNames), keyspace); + } + } + Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair"); + return filtered; + } + } } } diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 2468857..e13c90c 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -97,9 +97,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement public final RepairParallelism parallelismDegree; public final boolean pullRepair; - // indicates some replicas were not included in the repair. Only relevant for --force option - public final boolean skippedReplicas; - /** Range to repair */ public final CommonRange commonRange; public final boolean isIncremental; @@ -126,7 +123,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement * @param keyspace name of keyspace * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees * @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption) - * @param force true if the repair should ignore dead endpoints (instead of failing) * @param cfnames names of columnfamilies */ public RepairSession(UUID parentRepairSession, @@ -136,7 +132,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair, - boolean force, PreviewKind previewKind, boolean optimiseStreams, String... cfnames) @@ -148,37 +143,10 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement this.parallelismDegree = parallelismDegree; this.keyspace = keyspace; this.cfnames = cfnames; - - //If force then filter out dead endpoints - boolean forceSkippedReplicas = false; - if (force) - { - logger.debug("force flag set, removing dead endpoints"); - final Set<InetAddressAndPort> removeCandidates = new HashSet<>(); - for (final InetAddressAndPort endpoint : commonRange.endpoints) - { - if (!FailureDetector.instance.isAlive(endpoint)) - { - logger.info("Removing a dead node from Repair due to -force {}", endpoint); - removeCandidates.add(endpoint); - } - } - if (!removeCandidates.isEmpty()) - { - // we shouldn't be recording a successful repair if - // any replicas are excluded from the repair - forceSkippedReplicas = true; - Set<InetAddressAndPort> filteredEndpoints = new HashSet<>(commonRange.endpoints); - filteredEndpoints.removeAll(removeCandidates); - commonRange = new CommonRange(filteredEndpoints, commonRange.transEndpoints, commonRange.ranges); - } - } - this.commonRange = commonRange; this.isIncremental = isIncremental; this.previewKind = previewKind; this.pullRepair = pullRepair; - this.skippedReplicas = forceSkippedReplicas; this.optimiseStreams = optimiseStreams; this.taskExecutor = MoreExecutors.listeningDecorator(createExecutor()); } @@ -297,7 +265,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement { logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", commonRange)); Tracing.traceRepair(message); - set(new RepairSessionResult(id, keyspace, commonRange.ranges, Lists.<RepairResult>newArrayList(), skippedReplicas)); + set(new RepairSessionResult(id, keyspace, commonRange.ranges, Lists.<RepairResult>newArrayList(), commonRange.hasSkippedReplicas)); if (!previewKind.isPreview()) { SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message)); @@ -308,7 +276,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement // Checking all nodes are live for (InetAddressAndPort endpoint : commonRange.endpoints) { - if (!FailureDetector.instance.isAlive(endpoint) && !skippedReplicas) + if (!FailureDetector.instance.isAlive(endpoint) && !commonRange.hasSkippedReplicas) { message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint); logger.error("{} {}", previewKind.logPrefix(getId()), message); @@ -339,7 +307,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement // this repair session is completed logger.info("{} {}", previewKind.logPrefix(getId()), "Session completed successfully"); Tracing.traceRepair("Completed sync of range {}", commonRange); - set(new RepairSessionResult(id, keyspace, commonRange.ranges, results, skippedReplicas)); + set(new RepairSessionResult(id, keyspace, commonRange.ranges, results, commonRange.hasSkippedReplicas)); taskExecutor.shutdown(); // mark this session as terminated diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index f7b0686..2cdc794 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -315,7 +315,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair, - boolean force, PreviewKind previewKind, boolean optimiseStreams, ListeningExecutorService executor, @@ -328,7 +327,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai return null; final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, - parallelismDegree, isIncremental, pullRepair, force, + parallelismDegree, isIncremental, pullRepair, previewKind, optimiseStreams, cfnames); sessions.put(session.getId(), session); diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java index 023d02c..fce67b5 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java @@ -27,6 +27,7 @@ import org.apache.commons.lang3.ArrayUtils; import org.junit.Assert; import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.NodeToolResult; import org.apache.cassandra.distributed.api.QueryResult; @@ -45,11 +46,11 @@ public final class DistributedRepairUtils } - public static NodeToolResult repair(AbstractCluster<?> cluster, RepairType repairType, boolean withNotifications, String... args) { + public static NodeToolResult repair(ICluster<IInvokableInstance> cluster, RepairType repairType, boolean withNotifications, String... args) { return repair(cluster, DEFAULT_COORDINATOR, repairType, withNotifications, args); } - public static NodeToolResult repair(AbstractCluster<?> cluster, int node, RepairType repairType, boolean withNotifications, String... args) { + public static NodeToolResult repair(ICluster<IInvokableInstance> cluster, int node, RepairType repairType, boolean withNotifications, String... args) { args = repairType.append(args); args = ArrayUtils.addAll(new String[] { "repair" }, args); return cluster.get(node).nodetoolResult(withNotifications, args); @@ -65,12 +66,12 @@ public final class DistributedRepairUtils return cluster.get(node).callOnInstance(() -> StorageMetrics.repairExceptions.getCount()); } - public static QueryResult queryParentRepairHistory(AbstractCluster<?> cluster, String ks, String table) + public static QueryResult queryParentRepairHistory(ICluster<IInvokableInstance> cluster, String ks, String table) { return queryParentRepairHistory(cluster, DEFAULT_COORDINATOR, ks, table); } - public static QueryResult queryParentRepairHistory(AbstractCluster<?> cluster, int coordinator, String ks, String table) + public static QueryResult queryParentRepairHistory(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table) { // This is kinda brittle since the caller never gets the ID and can't ask for the ID; it needs to infer the id // this logic makes the assumption the ks/table pairs are unique (should be or else create should fail) so any @@ -84,35 +85,41 @@ public final class DistributedRepairUtils return rs; } - public static void assertParentRepairNotExist(AbstractCluster<?> cluster, String ks, String table) + public static void assertParentRepairNotExist(ICluster<IInvokableInstance> cluster, String ks, String table) { assertParentRepairNotExist(cluster, DEFAULT_COORDINATOR, ks, table); } - public static void assertParentRepairNotExist(AbstractCluster<?> cluster, int coordinator, String ks, String table) + public static void assertParentRepairNotExist(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table) { QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table); Assert.assertFalse("No repairs should be found but at least one found", rs.hasNext()); } - public static void assertParentRepairNotExist(AbstractCluster<?> cluster, String ks) + public static void assertParentRepairNotExist(ICluster<IInvokableInstance> cluster, String ks) { assertParentRepairNotExist(cluster, DEFAULT_COORDINATOR, ks); } - public static void assertParentRepairNotExist(AbstractCluster<?> cluster, int coordinator, String ks) + public static void assertParentRepairNotExist(ICluster<IInvokableInstance> cluster, int coordinator, String ks) { QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, null); Assert.assertFalse("No repairs should be found but at least one found", rs.hasNext()); } - public static void assertParentRepairSuccess(AbstractCluster<?> cluster, String ks, String table) + public static void assertParentRepairSuccess(ICluster<IInvokableInstance> cluster, String ks, String table) { assertParentRepairSuccess(cluster, DEFAULT_COORDINATOR, ks, table); } - public static void assertParentRepairSuccess(AbstractCluster<?> cluster, int coordinator, String ks, String table) + public static void assertParentRepairSuccess(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table) { + assertParentRepairSuccess(cluster, coordinator, ks, table, row -> {}); + } + + public static void assertParentRepairSuccess(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table, Consumer<Row> moreSuccessCriteria) + { + Assert.assertNotNull("Invalid null value for moreSuccessCriteria", moreSuccessCriteria); QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table); validateExistingParentRepair(rs, row -> { // check completed @@ -121,15 +128,17 @@ public final class DistributedRepairUtils // check not failed (aka success) Assert.assertNull("Exception found", row.getString("exception_stacktrace")); Assert.assertNull("Exception found", row.getString("exception_message")); + + moreSuccessCriteria.accept(row); }); } - public static void assertParentRepairFailedWithMessageContains(AbstractCluster<?> cluster, String ks, String table, String message) + public static void assertParentRepairFailedWithMessageContains(ICluster<IInvokableInstance> cluster, String ks, String table, String message) { assertParentRepairFailedWithMessageContains(cluster, DEFAULT_COORDINATOR, ks, table, message); } - public static void assertParentRepairFailedWithMessageContains(AbstractCluster<?> cluster, int coordinator, String ks, String table, String message) + public static void assertParentRepairFailedWithMessageContains(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table, String message) { QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table); validateExistingParentRepair(rs, row -> { diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java index f37a3d8..b127a74 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java @@ -21,11 +21,14 @@ package org.apache.cassandra.distributed.test; import java.io.IOException; import java.util.Arrays; import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -38,27 +41,26 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition; import org.apache.cassandra.utils.progress.ProgressEventType; import static java.util.concurrent.TimeUnit.MINUTES; -import static org.apache.cassandra.distributed.test.ExecUtil.rethrow; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; -import static org.apache.cassandra.distributed.shared.AssertUtils.*; +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; +import static org.apache.cassandra.distributed.test.ExecUtil.rethrow; public class RepairTest extends TestBaseImpl { - private static final String insert = withKeyspace("INSERT INTO %s.test (k, c1, c2) VALUES (?, 'value1', 'value2');"); - private static final String query = withKeyspace("SELECT k, c1, c2 FROM %s.test WHERE k = ?;"); - private static ICluster<IInvokableInstance> cluster; - private static void insert(ICluster<IInvokableInstance> cluster, int start, int end, int ... nodes) + private static void insert(ICluster<IInvokableInstance> cluster, String keyspace, int start, int end, int ... nodes) { + String insert = String.format("INSERT INTO %s.test (k, c1, c2) VALUES (?, 'value1', 'value2');", keyspace); for (int i = start ; i < end ; ++i) for (int node : nodes) cluster.get(node).executeInternal(insert, Integer.toString(i)); } - private static void verify(ICluster<IInvokableInstance> cluster, int start, int end, int ... nodes) + private static void verify(ICluster<IInvokableInstance> cluster, String keyspace, int start, int end, int ... nodes) { + String query = String.format("SELECT k, c1, c2 FROM %s.test WHERE k = ?;", keyspace); for (int i = start ; i < end ; ++i) { for (int node = 1 ; node <= cluster.size() ; ++node) @@ -72,10 +74,10 @@ public class RepairTest extends TestBaseImpl } } - private static void flush(ICluster<IInvokableInstance> cluster, int ... nodes) + private static void flush(ICluster<IInvokableInstance> cluster, String keyspace, int ... nodes) { for (int node : nodes) - cluster.get(node).runOnInstance(rethrow(() -> StorageService.instance.forceKeyspaceFlush(KEYSPACE))); + cluster.get(node).runOnInstance(rethrow(() -> StorageService.instance.forceKeyspaceFlush(keyspace))); } private static ICluster create(Consumer<IInstanceConfig> configModifier) throws IOException @@ -90,11 +92,11 @@ public class RepairTest extends TestBaseImpl return init(Cluster.build().withNodes(3).withConfig(configModifier).start()); } - private void repair(ICluster<IInvokableInstance> cluster, Map<String, String> options) + static void repair(ICluster<IInvokableInstance> cluster, String keyspace, Map<String, String> options) { cluster.get(1).runOnInstance(rethrow(() -> { SimpleCondition await = new SimpleCondition(); - StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> { + StorageService.instance.repair(keyspace, options, ImmutableList.of((tag, event) -> { if (event.getType() == ProgressEventType.COMPLETE) await.signalAll(); })).right.get(); @@ -102,22 +104,22 @@ public class RepairTest extends TestBaseImpl })); } - void populate(ICluster<IInvokableInstance> cluster, String compression) throws Exception + static void populate(ICluster<IInvokableInstance> cluster, String keyspace, String compression) throws Exception { try { - cluster.schemaChange(withKeyspace("DROP TABLE IF EXISTS %s.test;")); - cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = " + compression)); - - insert(cluster, 0, 1000, 1, 2, 3); - flush(cluster, 1); - insert(cluster, 1000, 1001, 1, 2); - insert(cluster, 1001, 2001, 1, 2, 3); - flush(cluster, 1, 2, 3); - - verify(cluster, 0, 1000, 1, 2, 3); - verify(cluster, 1000, 1001, 1, 2); - verify(cluster, 1001, 2001, 1, 2, 3); + cluster.schemaChange(String.format("DROP TABLE IF EXISTS %s.test;", keyspace)); + cluster.schemaChange(String.format("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = %s", keyspace, compression)); + + insert(cluster, keyspace, 0, 1000, 1, 2, 3); + flush(cluster, keyspace, 1); + insert(cluster, keyspace, 1000, 1001, 1, 2); + insert(cluster, keyspace, 1001, 2001, 1, 2, 3); + flush(cluster, keyspace, 1, 2, 3); + + verify(cluster, keyspace, 0, 1000, 1, 2, 3); + verify(cluster, keyspace, 1000, 1001, 1, 2); + verify(cluster, keyspace, 1001, 2001, 1, 2, 3); } catch (Throwable t) { @@ -129,9 +131,16 @@ public class RepairTest extends TestBaseImpl void repair(ICluster<IInvokableInstance> cluster, boolean sequential, String compression) throws Exception { - populate(cluster, compression); - repair(cluster, ImmutableMap.of("parallelism", sequential ? "sequential" : "parallel")); - verify(cluster, 0, 2001, 1, 2, 3); + populate(cluster, KEYSPACE, compression); + repair(cluster, KEYSPACE, ImmutableMap.of("parallelism", sequential ? "sequential" : "parallel")); + verify(cluster, KEYSPACE, 0, 2001, 1, 2, 3); + } + + void shutDownNodesAndForceRepair(ICluster<IInvokableInstance> cluster, String keyspace, int downNode) throws Exception + { + populate(cluster, keyspace, "{'enabled': false}"); + cluster.get(downNode).shutdown().get(5, TimeUnit.SECONDS); + repair(cluster, keyspace, ImmutableMap.of("forceRepair", "true")); } @BeforeClass @@ -182,4 +191,33 @@ public class RepairTest extends TestBaseImpl { repair(cluster, false, "{'enabled': false}"); } + + @Test + public void testForcedNormalRepairWithOneNodeDown() throws Exception + { + // The test uses its own keyspace with rf == 2 + String forceRepairKeyspace = "test_force_repair_keyspace"; + int rf = 2; + cluster.schemaChange("CREATE KEYSPACE " + forceRepairKeyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};"); + + try + { + shutDownNodesAndForceRepair(cluster, forceRepairKeyspace, 3); // shutdown node 3 after inserting + DistributedRepairUtils.assertParentRepairSuccess(cluster, 1, forceRepairKeyspace, "test", row -> { + Set<String> successfulRanges = row.getSet("successful_ranges"); + Set<String> requestedRanges = row.getSet("requested_ranges"); + Assert.assertNotNull("Found no successful ranges", successfulRanges); + Assert.assertNotNull("Found no requested ranges", requestedRanges); + Assert.assertEquals("Requested ranges count should equals to replication factor", rf, requestedRanges.size()); + Assert.assertTrue("Given clusterSize = 3, RF = 2 and 1 node down in the replica set, it should yield only 1 successful repaired range.", + successfulRanges.size() == 1 && !successfulRanges.contains("")); // the successful ranges set should not only contain empty string + }); + } + finally + { + // bring the node 3 back up + if (cluster.get(3).isShutdown()) + cluster.get(3).startup(cluster); + } + } } diff --git a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java b/test/unit/org/apache/cassandra/repair/NeighborsAndRangesTest.java similarity index 67% rename from test/unit/org/apache/cassandra/repair/RepairRunnableTest.java rename to test/unit/org/apache/cassandra/repair/NeighborsAndRangesTest.java index 418d7de..9dcd7dc 100644 --- a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java +++ b/test/unit/org/apache/cassandra/repair/NeighborsAndRangesTest.java @@ -29,35 +29,38 @@ import org.junit.Test; import org.apache.cassandra.locator.InetAddressAndPort; -import static org.apache.cassandra.repair.RepairRunnable.filterCommonRanges; +import static org.apache.cassandra.repair.RepairRunnable.NeighborsAndRanges; -public class RepairRunnableTest extends AbstractRepairTest +public class NeighborsAndRangesTest extends AbstractRepairTest { /** * For non-forced repairs, common ranges should be passed through as-is */ @Test - public void filterCommonIncrementalRangesNotForced() throws Exception + public void filterCommonIncrementalRangesNotForced() { CommonRange cr = new CommonRange(PARTICIPANTS, Collections.emptySet(), ALL_RANGES); - + NeighborsAndRanges nr = new NeighborsAndRanges(false, PARTICIPANTS, Collections.singletonList(cr)); List<CommonRange> expected = Lists.newArrayList(cr); - List<CommonRange> actual = filterCommonRanges(expected, Collections.emptySet(), false); + List<CommonRange> actual = nr.filterCommonRanges(null, null); Assert.assertEquals(expected, actual); } @Test - public void forceFilterCommonIncrementalRanges() throws Exception + public void forceFilterCommonIncrementalRanges() { - CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1, RANGE2)); + CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1)); CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3)); + CommonRange cr3 = new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE2)); Set<InetAddressAndPort> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded + List<CommonRange> initial = Lists.newArrayList(cr1, cr2, cr3); + List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1), true), + new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3), true), + new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE2), false)); - List<CommonRange> initial = Lists.newArrayList(cr1, cr2); - List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1, RANGE2)), - new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3))); - List<CommonRange> actual = filterCommonRanges(initial, liveEndpoints, true); + NeighborsAndRanges nr = new NeighborsAndRanges(true, liveEndpoints, initial); + List<CommonRange> actual = nr.filterCommonRanges(null, null); Assert.assertEquals(expected, actual); } diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java index d3af58f..9887d38 100644 --- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java @@ -107,9 +107,9 @@ public class RepairJobTest public MeasureableRepairSession(UUID parentRepairSession, UUID id, CommonRange commonRange, String keyspace, RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair, - boolean force, PreviewKind previewKind, boolean optimiseStreams, String... cfnames) + PreviewKind previewKind, boolean optimiseStreams, String... cfnames) { - super(parentRepairSession, id, commonRange, keyspace, parallelismDegree, isIncremental, pullRepair, force, previewKind, optimiseStreams, cfnames); + super(parentRepairSession, id, commonRange, keyspace, parallelismDegree, isIncremental, pullRepair, previewKind, optimiseStreams, cfnames); } protected DebuggableThreadPoolExecutor createExecutor() @@ -168,8 +168,7 @@ public class RepairJobTest this.session = new MeasureableRepairSession(parentRepairSession, UUIDGen.getTimeUUID(), new CommonRange(neighbors, Collections.emptySet(), FULL_RANGE), - KEYSPACE, RepairParallelism.SEQUENTIAL, - false, false, false, + KEYSPACE, RepairParallelism.SEQUENTIAL, false, false, PreviewKind.NONE, false, CF); this.job = new RepairJob(session, CF); diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java index e77d657..2ad5831 100644 --- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java @@ -66,7 +66,7 @@ public class RepairSessionTest RepairSession session = new RepairSession(parentSessionId, sessionId, new CommonRange(endpoints, Collections.emptySet(), Arrays.asList(repairRange)), "Keyspace1", RepairParallelism.SEQUENTIAL, - false, false, false, + false, false, PreviewKind.NONE, false, "Standard1"); // perform convict --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org