Add option to optimize Merkle tree comparison across replicas Patch by marcuse; reviewed by Blake Eggleston for CASSANDRA-3200
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cb56d9fc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cb56d9fc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cb56d9fc Branch: refs/heads/trunk Commit: cb56d9fc3c773abbefa2044ce41ddbfb7717e0cb Parents: a6f3983 Author: Marcus Eriksson <marc...@apache.org> Authored: Thu Dec 7 13:55:44 2017 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Thu Dec 7 13:55:56 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 4 + .../repair/AsymmetricLocalSyncTask.java | 105 +++++ .../repair/AsymmetricRemoteSyncTask.java | 60 +++ .../cassandra/repair/AsymmetricSyncTask.java | 85 ++++ .../repair/CompletableRemoteSyncTask.java | 28 ++ .../apache/cassandra/repair/LocalSyncTask.java | 1 - .../apache/cassandra/repair/RemoteSyncTask.java | 2 +- .../org/apache/cassandra/repair/RepairJob.java | 137 ++++-- .../repair/RepairMessageVerbHandler.java | 24 +- .../apache/cassandra/repair/RepairRunnable.java | 1 + .../apache/cassandra/repair/RepairSession.java | 12 +- .../cassandra/repair/StreamingRepairTask.java | 36 +- .../repair/asymmetric/DifferenceHolder.java | 98 +++++ .../repair/asymmetric/HostDifferences.java | 83 ++++ .../asymmetric/IncomingRepairStreamTracker.java | 81 ++++ .../repair/asymmetric/PreferedNodeFilter.java | 27 ++ .../repair/asymmetric/RangeDenormalizer.java | 125 ++++++ .../repair/asymmetric/ReduceHelper.java | 137 ++++++ .../repair/asymmetric/StreamFromOptions.java | 109 +++++ .../repair/messages/AsymmetricSyncRequest.java | 132 ++++++ .../repair/messages/RepairMessage.java | 3 +- .../cassandra/repair/messages/RepairOption.java | 24 +- .../cassandra/service/ActiveRepairService.java | 4 +- .../apache/cassandra/tools/nodetool/Repair.java | 6 +- .../cassandra/repair/LocalSyncTaskTest.java | 2 +- .../cassandra/repair/RepairSessionTest.java | 2 +- .../repair/StreamingRepairTaskTest.java | 5 +- .../repair/asymmetric/DifferenceHolderTest.java | 106 +++++ .../asymmetric/RangeDenormalizerTest.java | 86 ++++ .../repair/asymmetric/ReduceHelperTest.java | 425 +++++++++++++++++++ .../asymmetric/StreamFromOptionsTest.java | 124 ++++++ .../apache/cassandra/utils/MerkleTreesTest.java | 2 +- 33 files changed, 2013 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 34af97d..ef414b9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add option to optimise merkle tree comparison across replicas (CASSANDRA-3200) * Remove unused and deprecated methods from AbstractCompactionStrategy (CASSANDRA-14081) * Fix Distribution.average in cassandra-stress (CASSANDRA-14090) * Support a means of logging all queries as they were invoked (CASSANDRA-13983) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index a14f7ba..43f57f2 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -18,6 +18,10 @@ using the provided 'sstableupgrade' tool. New features ------------ + - An experimental option to compare all merkle trees together has been added - for example, in + a 3 node cluster with 2 replicas identical and 1 out-of-date, with this option enabled, the + out-of-date replica will only stream a single copy from up-to-date replica. Enable it by adding + "-os" to nodetool repair. See CASSANDRA-3200. - The currentTimestamp, currentDate, currentTime and currentTimeUUID functions have been added. See CASSANDRA-13132 - Support for arithmetic operations between `timestamp`/`date` and `duration` has been added. http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java new file mode 100644 index 0000000..5464520 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.util.List; +import java.util.UUID; + +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamEvent; +import org.apache.cassandra.streaming.StreamEventHandler; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; + +public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements StreamEventHandler +{ + private final UUID pendingRepair; + private final TraceState state = Tracing.instance.get(); + + public AsymmetricLocalSyncTask(RepairJobDesc desc, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, UUID pendingRepair, PreviewKind previewKind) + { + super(desc, FBUtilities.getBroadcastAddress(), fetchFrom, rangesToFetch, previewKind); + this.pendingRepair = pendingRepair; + } + + public void startSync(List<Range<Token>> rangesToFetch) + { + InetAddress preferred = SystemKeyspace.getPreferredIP(fetchFrom); + StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, + 1, false, + false, + pendingRepair, + previewKind) + .listeners(this) + .flushBeforeTransfer(pendingRepair == null) + // request ranges from the remote node + .requestRanges(fetchFrom, preferred, desc.keyspace, rangesToFetch, desc.columnFamily); + plan.execute(); + + } + + public void handleStreamEvent(StreamEvent event) + { + if (state == null) + return; + switch (event.eventType) + { + case STREAM_PREPARED: + StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event; + state.trace("Streaming session with {} prepared", spe.session.peer); + break; + case STREAM_COMPLETE: + StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event; + state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed"); + break; + case FILE_PROGRESS: + ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress; + state.trace("{}/{} ({}%) {} idx:{}{}", + new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes), + FBUtilities.prettyPrintMemory(pi.totalBytes), + pi.currentBytes * 100 / pi.totalBytes, + pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from", + pi.sessionIndex, + pi.peer }); + } + } + + public void onSuccess(StreamState result) + { + String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, fetchingNode, fetchFrom, desc.columnFamily); + Tracing.traceRepair(message); + set(stat); + finished(); + } + + public void onFailure(Throwable t) + { + setException(t); + finished(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java new file mode 100644 index 0000000..d70975d --- /dev/null +++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.util.List; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RepairException; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.messages.AsymmetricSyncRequest; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.SessionSummary; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; + +public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements CompletableRemoteSyncTask +{ + public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddress fetchNode, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind) + { + super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind); + } + + public void startSync(List<Range<Token>> rangesToFetch) + { + InetAddress local = FBUtilities.getBroadcastAddress(); + AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, fetchingNode, fetchFrom, rangesToFetch, previewKind); + String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.fetchingNode, request.fetchFrom); + Tracing.traceRepair(message); + MessagingService.instance().sendOneWay(request.createMessage(), request.fetchingNode); + } + public void syncComplete(boolean success, List<SessionSummary> summaries) + { + if (success) + { + set(stat.withSummaries(summaries)); + } + else + { + setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", fetchingNode, fetchFrom))); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java new file mode 100644 index 0000000..fe00058 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.AbstractFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.tracing.Tracing; + +public abstract class AsymmetricSyncTask extends AbstractFuture<SyncStat> implements Runnable +{ + private static Logger logger = LoggerFactory.getLogger(AsymmetricSyncTask.class); + protected final RepairJobDesc desc; + protected final InetAddress fetchFrom; + protected final List<Range<Token>> rangesToFetch; + protected final InetAddress fetchingNode; + protected final PreviewKind previewKind; + private long startTime = Long.MIN_VALUE; + protected volatile SyncStat stat; + + + public AsymmetricSyncTask(RepairJobDesc desc, InetAddress fetchingNode, InetAddress fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind) + { + this.desc = desc; + this.fetchFrom = fetchFrom; + this.fetchingNode = fetchingNode; + this.rangesToFetch = rangesToFetch; + // todo: make an AsymmetricSyncStat? + stat = new SyncStat(new NodePair(fetchingNode, fetchFrom), rangesToFetch.size()); + this.previewKind = previewKind; + } + public void run() + { + startTime = System.currentTimeMillis(); + // choose a repair method based on the significance of the difference + String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), fetchingNode, fetchFrom, desc.columnFamily); + if (rangesToFetch.isEmpty()) + { + logger.info(String.format(format, "are consistent")); + Tracing.traceRepair("Endpoint {} is consistent with {} for {}", fetchingNode, fetchFrom, desc.columnFamily); + set(stat); + return; + } + + // non-0 difference: perform streaming repair + logger.info(String.format(format, "have " + rangesToFetch.size() + " range(s) out of sync")); + Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", fetchingNode, rangesToFetch.size(), fetchFrom, desc.columnFamily); + startSync(rangesToFetch); + } + + protected void finished() + { + if (startTime != Long.MIN_VALUE) + Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } + + + public abstract void startSync(List<Range<Token>> rangesToFetch); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/CompletableRemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/CompletableRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/CompletableRemoteSyncTask.java new file mode 100644 index 0000000..c4fe6c8 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/CompletableRemoteSyncTask.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.util.List; + +import org.apache.cassandra.streaming.SessionSummary; + +public interface CompletableRemoteSyncTask +{ + void syncComplete(boolean success, List<SessionSummary> summaries); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index 343950b..8545b22 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -61,7 +61,6 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler this.pullRepair = pullRepair; } - @VisibleForTesting StreamPlan createStreamPlan(InetAddress dst, InetAddress preferred, List<Range<Token>> differences) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java index 6cc786e..93feb72 100644 --- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java @@ -39,7 +39,7 @@ import org.apache.cassandra.utils.FBUtilities; * * When RemoteSyncTask receives SyncComplete from remote node, task completes. */ -public class RemoteSyncTask extends SyncTask +public class RemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask { private static final Logger logger = LoggerFactory.getLogger(RemoteSyncTask.class); http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index d0654bd..7b8eb92 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -19,7 +19,9 @@ package org.apache.cassandra.repair; import java.net.InetAddress; import java.util.*; +import java.util.stream.Collectors; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,7 +29,13 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.repair.asymmetric.DifferenceHolder; +import org.apache.cassandra.repair.asymmetric.HostDifferences; +import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter; +import org.apache.cassandra.repair.asymmetric.ReduceHelper; import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -45,6 +53,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable private final ListeningExecutorService taskExecutor; private final boolean isIncremental; private final PreviewKind previewKind; + private final boolean optimiseStreams; /** * Create repair job to run on specific columnfamily @@ -52,7 +61,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable * @param session RepairSession that this RepairJob belongs * @param columnFamily name of the ColumnFamily to repair */ - public RepairJob(RepairSession session, String columnFamily, boolean isIncremental, PreviewKind previewKind) + public RepairJob(RepairSession session, String columnFamily, boolean isIncremental, PreviewKind previewKind, boolean optimiseStreams) { this.session = session; this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges()); @@ -60,6 +69,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable this.parallelismDegree = session.parallelismDegree; this.isIncremental = isIncremental; this.previewKind = previewKind; + this.optimiseStreams = optimiseStreams; } /** @@ -118,39 +128,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable } // When all validations complete, submit sync tasks - ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(validations, new AsyncFunction<List<TreeResponse>, List<SyncStat>>() - { - public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> trees) - { - InetAddress local = FBUtilities.getLocalAddress(); - - List<SyncTask> syncTasks = new ArrayList<>(); - // We need to difference all trees one against another - for (int i = 0; i < trees.size() - 1; ++i) - { - TreeResponse r1 = trees.get(i); - for (int j = i + 1; j < trees.size(); ++j) - { - TreeResponse r2 = trees.get(j); - SyncTask task; - if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) - { - task = new LocalSyncTask(desc, r1, r2, isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind); - } - else - { - task = new RemoteSyncTask(desc, r1, r2, session.previewKind); - // RemoteSyncTask expects SyncComplete message sent back. - // Register task to RepairSession to receive response. - session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task); - } - syncTasks.add(task); - taskExecutor.submit(task); - } - } - return Futures.allAsList(syncTasks); - } - }, taskExecutor); + ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(validations, optimiseStreams && !session.pullRepair ? optimisedSyncing() : standardSyncing(), taskExecutor); // When all sync complete, set the final result Futures.addCallback(syncResults, new FutureCallback<List<SyncStat>>() @@ -182,6 +160,97 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable }, taskExecutor); } + private AsyncFunction<List<TreeResponse>, List<SyncStat>> standardSyncing() + { + return trees -> + { + InetAddress local = FBUtilities.getLocalAddress(); + + List<SyncTask> syncTasks = new ArrayList<>(); + // We need to difference all trees one against another + for (int i = 0; i < trees.size() - 1; ++i) + { + TreeResponse r1 = trees.get(i); + for (int j = i + 1; j < trees.size(); ++j) + { + TreeResponse r2 = trees.get(j); + SyncTask task; + if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) + { + task = new LocalSyncTask(desc, r1, r2, isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind); + } + else + { + task = new RemoteSyncTask(desc, r1, r2, session.previewKind); + // RemoteSyncTask expects SyncComplete message sent back. + // Register task to RepairSession to receive response. + session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task); + } + syncTasks.add(task); + taskExecutor.submit(task); + } + } + return Futures.allAsList(syncTasks); + }; + } + + private AsyncFunction<List<TreeResponse>, List<SyncStat>> optimisedSyncing() + { + return trees -> + { + InetAddress local = FBUtilities.getLocalAddress(); + + List<AsymmetricSyncTask> syncTasks = new ArrayList<>(); + // We need to difference all trees one against another + DifferenceHolder diffHolder = new DifferenceHolder(trees); + + logger.debug("diffs = {}", diffHolder); + PreferedNodeFilter preferSameDCFilter = (streaming, candidates) -> + candidates.stream() + .filter(node -> getDC(streaming) + .equals(getDC(node))) + .collect(Collectors.toSet()); + ImmutableMap<InetAddress, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter); + + for (int i = 0; i < trees.size(); i++) + { + InetAddress address = trees.get(i).endpoint; + HostDifferences streamsFor = reducedDifferences.get(address); + if (streamsFor != null) + { + assert streamsFor.get(address).isEmpty() : "We should not fetch ranges from ourselves"; + for (InetAddress fetchFrom : streamsFor.hosts()) + { + List<Range<Token>> toFetch = streamsFor.get(fetchFrom); + logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom); + AsymmetricSyncTask task; + if (address.equals(local)) + { + task = new AsymmetricLocalSyncTask(desc, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, previewKind); + } + else + { + task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind); + session.waitForSync(Pair.create(desc, new NodePair(address, fetchFrom)),(AsymmetricRemoteSyncTask)task); + } + syncTasks.add(task); + taskExecutor.submit(task); + } + } + else + { + logger.debug("Node {} has nothing to stream", address); + } + } + return Futures.allAsList(syncTasks); + }; + } + + private String getDC(InetAddress address) + { + return DatabaseDescriptor.getEndpointSnitch().getDatacenter(address); + } + /** * Creates {@link ValidationTask} and submit them to task executor in parallel. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 3c7f890..c26d4d1 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -144,10 +144,32 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> // forwarded sync request SyncRequest request = (SyncRequest) message.payload; logger.debug("Syncing {}", request); - StreamingRepairTask task = new StreamingRepairTask(desc, request, isIncremental(desc.parentSessionId) ? desc.parentSessionId : null, request.previewKind); + StreamingRepairTask task = new StreamingRepairTask(desc, + request.initiator, + request.src, + request.dst, + request.ranges, + isIncremental(desc.parentSessionId) ? desc.parentSessionId : null, + request.previewKind, + false); task.run(); break; + case ASYMMETRIC_SYNC_REQUEST: + // forwarded sync request + AsymmetricSyncRequest asymmetricSyncRequest = (AsymmetricSyncRequest) message.payload; + logger.debug("Syncing {}", asymmetricSyncRequest); + StreamingRepairTask asymmetricTask = new StreamingRepairTask(desc, + asymmetricSyncRequest.initiator, + asymmetricSyncRequest.fetchingNode, + asymmetricSyncRequest.fetchFrom, + asymmetricSyncRequest.ranges, + isIncremental(desc.parentSessionId) ? desc.parentSessionId : null, + asymmetricSyncRequest.previewKind, + true); + asymmetricTask.run(); + break; + case CLEANUP: logger.debug("cleaning up repair"); CleanupMessage cleanup = (CleanupMessage) message.payload; http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 2b67a3c..1c9778b 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -515,6 +515,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti options.isPullRepair(), force, options.getPreviewKind(), + options.optimiseStreams(), executor, cfnames); if (session == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 5dbd050..609ec56 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -104,10 +104,11 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address) private final ConcurrentMap<Pair<RepairJobDesc, InetAddress>, ValidationTask> validating = new ConcurrentHashMap<>(); // Remote syncing jobs wait response in syncingTasks map - private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> syncingTasks = new ConcurrentHashMap<>(); + private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>(); // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor public final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask")); + private final boolean optimiseStreams; private volatile boolean terminated = false; @@ -134,6 +135,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement boolean pullRepair, boolean force, PreviewKind previewKind, + boolean optimiseStreams, String... cfnames) { assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it"; @@ -174,6 +176,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement this.previewKind = previewKind; this.pullRepair = pullRepair; this.skippedReplicas = forceSkippedReplicas; + this.optimiseStreams = optimiseStreams; } public UUID getId() @@ -191,11 +194,12 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement validating.put(key, task); } - public void waitForSync(Pair<RepairJobDesc, NodePair> key, RemoteSyncTask task) + public void waitForSync(Pair<RepairJobDesc, NodePair> key, CompletableRemoteSyncTask task) { syncingTasks.put(key, task); } + /** * Receive merkle tree response or failed response from {@code endpoint} for current repair job. * @@ -227,7 +231,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement */ public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean success, List<SessionSummary> summaries) { - RemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes)); + CompletableRemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes)); if (task == null) { assert terminated; @@ -301,7 +305,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement List<ListenableFuture<RepairResult>> jobs = new ArrayList<>(cfnames.length); for (String cfname : cfnames) { - RepairJob job = new RepairJob(this, cfname, isIncremental, previewKind); + RepairJob job = new RepairJob(this, cfname, isIncremental, previewKind, optimiseStreams); executor.execute(job); jobs.add(job); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index f43010b..a1b7459 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -20,15 +20,17 @@ package org.apache.cassandra.repair; import java.net.InetAddress; import java.util.UUID; import java.util.Collections; +import java.util.Collection; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.SyncComplete; -import org.apache.cassandra.repair.messages.SyncRequest; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.StreamEvent; import org.apache.cassandra.streaming.StreamEventHandler; @@ -45,34 +47,44 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler private static final Logger logger = LoggerFactory.getLogger(StreamingRepairTask.class); private final RepairJobDesc desc; - private final SyncRequest request; + private final boolean asymmetric; + private final InetAddress initiator; + private final InetAddress src; + private final InetAddress dst; + private final Collection<Range<Token>> ranges; private final UUID pendingRepair; private final PreviewKind previewKind; - public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, UUID pendingRepair, PreviewKind previewKind) + public StreamingRepairTask(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind previewKind, boolean asymmetric) { this.desc = desc; - this.request = request; + this.initiator = initiator; + this.src = src; + this.dst = dst; + this.ranges = ranges; + this.asymmetric = asymmetric; this.pendingRepair = pendingRepair; this.previewKind = previewKind; } public void run() { - InetAddress dest = request.dst; + InetAddress dest = dst; InetAddress preferred = SystemKeyspace.getPreferredIP(dest); - logger.info("[streaming task #{}] Performing streaming repair of {} ranges with {}", desc.sessionId, request.ranges.size(), request.dst); + logger.info("[streaming task #{}] Performing streaming repair of {} ranges with {}", desc.sessionId, ranges.size(), dst); createStreamPlan(dest, preferred).execute(); } @VisibleForTesting StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred) { - return new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind) + StreamPlan sp = new StreamPlan(StreamOperation.REPAIR, 1, false, false, pendingRepair, previewKind) .listeners(this) .flushBeforeTransfer(pendingRepair == null) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary - .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) // request ranges from the remote node - .transferRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily); // send ranges to the remote node + .requestRanges(dest, preferred, desc.keyspace, ranges, desc.columnFamily); // request ranges from the remote node + if (!asymmetric) + sp.transferRanges(dest, preferred, desc.keyspace, ranges, desc.columnFamily); // send ranges to the remote node + return sp; } public void handleStreamEvent(StreamEvent event) @@ -86,8 +98,8 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler */ public void onSuccess(StreamState state) { - logger.info("{} streaming task succeed, returning response to {}", previewKind.logPrefix(desc.sessionId), request.initiator); - MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, true, state.createSummaries()).createMessage(), request.initiator); + logger.info("[repair #{}] streaming task succeed, returning response to {}", desc.sessionId, initiator); + MessagingService.instance().sendOneWay(new SyncComplete(desc, src, dst, true, state.createSummaries()).createMessage(), initiator); } /** @@ -95,6 +107,6 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler */ public void onFailure(Throwable t) { - MessagingService.instance().sendOneWay(new SyncComplete(desc, request.src, request.dst, false, Collections.emptyList()).createMessage(), request.initiator); + MessagingService.instance().sendOneWay(new SyncComplete(desc, src, dst, false, Collections.emptyList()).createMessage(), initiator); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java new file mode 100644 index 0000000..eb99977 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/asymmetric/DifferenceHolder.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.asymmetric; + +import java.net.InetAddress; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.TreeResponse; +import org.apache.cassandra.utils.MerkleTrees; + +/** + * Just holds all differences between the hosts involved in a repair + */ +public class DifferenceHolder +{ + private final ImmutableMap<InetAddress, HostDifferences> differences; + + public DifferenceHolder(List<TreeResponse> trees) + { + ImmutableMap.Builder<InetAddress, HostDifferences> diffBuilder = ImmutableMap.builder(); + for (int i = 0; i < trees.size() - 1; ++i) + { + TreeResponse r1 = trees.get(i); + // create the differences between r1 and all other hosts: + HostDifferences hd = new HostDifferences(); + for (int j = i + 1; j < trees.size(); ++j) + { + TreeResponse r2 = trees.get(j); + hd.add(r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees)); + } + // and add them to the diff map + diffBuilder.put(r1.endpoint, hd); + } + differences = diffBuilder.build(); + } + + @VisibleForTesting + DifferenceHolder(Map<InetAddress, HostDifferences> differences) + { + ImmutableMap.Builder<InetAddress, HostDifferences> diffBuilder = ImmutableMap.builder(); + diffBuilder.putAll(differences); + this.differences = diffBuilder.build(); + } + + /** + * differences only holds one 'side' of the difference - if A and B mismatch, only A will be a key in the map + */ + public Set<InetAddress> keyHosts() + { + return differences.keySet(); + } + + public HostDifferences get(InetAddress hostWithDifference) + { + return differences.get(hostWithDifference); + } + + public String toString() + { + return "DifferenceHolder{" + + "differences=" + differences + + '}'; + } + + public boolean hasDifferenceBetween(InetAddress node1, InetAddress node2, Range<Token> range) + { + HostDifferences diffsNode1 = differences.get(node1); + if (diffsNode1 != null && diffsNode1.hasDifferencesFor(node2, range)) + return true; + HostDifferences diffsNode2 = differences.get(node2); + if (diffsNode2 != null && diffsNode2.hasDifferencesFor(node1, range)) + return true; + return false; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java new file mode 100644 index 0000000..6cbe987 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/asymmetric/HostDifferences.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.asymmetric; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +/** + * Tracks the differences for a single host + */ +public class HostDifferences +{ + private final Map<InetAddress, List<Range<Token>>> perHostDifferences = new HashMap<>(); + + /** + * Adds a set of differences between the node this instance is tracking and endpoint + */ + public void add(InetAddress endpoint, List<Range<Token>> difference) + { + perHostDifferences.put(endpoint, difference); + } + + public void addSingleRange(InetAddress remoteNode, Range<Token> rangeToFetch) + { + perHostDifferences.computeIfAbsent(remoteNode, (x) -> new ArrayList<>()).add(rangeToFetch); + } + + /** + * Does this instance have differences for range with node2? + */ + public boolean hasDifferencesFor(InetAddress node2, Range<Token> range) + { + List<Range<Token>> differences = get(node2); + for (Range<Token> diff : differences) + { + // if the other node has a diff for this range, we know they are not equal. + if (range.equals(diff) || range.intersects(diff)) + return true; + } + return false; + } + + public Set<InetAddress> hosts() + { + return perHostDifferences.keySet(); + } + + public List<Range<Token>> get(InetAddress differingHost) + { + return perHostDifferences.getOrDefault(differingHost, Collections.emptyList()); + } + + public String toString() + { + return "HostDifferences{" + + "perHostDifferences=" + perHostDifferences + + '}'; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java new file mode 100644 index 0000000..b41ddd8 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/asymmetric/IncomingRepairStreamTracker.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.asymmetric; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +/** + * Tracks incoming streams for a single host + */ +public class IncomingRepairStreamTracker +{ + private static final Logger logger = LoggerFactory.getLogger(IncomingRepairStreamTracker.class); + private final DifferenceHolder differences; + private final Map<Range<Token>, StreamFromOptions> incoming = new HashMap<>(); + + public IncomingRepairStreamTracker(DifferenceHolder differences) + { + this.differences = differences; + } + + public String toString() + { + return "IncomingStreamTracker{" + + "incoming=" + incoming + + '}'; + } + + /** + * Adds a range to be streamed from streamFromNode + * + * First the currently tracked ranges are denormalized to make sure that no ranges overlap, then + * the streamFromNode is added to the StreamFromOptions for the range + * + * @param range the range we need to stream from streamFromNode + * @param streamFromNode the node we should stream from + */ + public void addIncomingRangeFrom(Range<Token> range, InetAddress streamFromNode) + { + logger.trace("adding incoming range {} from {}", range, streamFromNode); + Set<Range<Token>> newInput = RangeDenormalizer.denormalize(range, incoming); + for (Range<Token> input : newInput) + { + incoming.computeIfAbsent(input, (newRange) -> new StreamFromOptions(differences, newRange)).add(streamFromNode); + } + } + + public ImmutableMap<Range<Token>, StreamFromOptions> getIncoming() + { + return ImmutableMap.copyOf(incoming); + } +} + + + + http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java b/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java new file mode 100644 index 0000000..90788dc --- /dev/null +++ b/src/java/org/apache/cassandra/repair/asymmetric/PreferedNodeFilter.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.asymmetric; + +import java.net.InetAddress; +import java.util.Set; + +public interface PreferedNodeFilter +{ + public Set<InetAddress> apply(InetAddress streamingNode, Set<InetAddress> toStream); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java b/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java new file mode 100644 index 0000000..a04d6d5 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/asymmetric/RangeDenormalizer.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.asymmetric; + +import java.net.InetAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +public class RangeDenormalizer +{ + private static final Logger logger = LoggerFactory.getLogger(IncomingRepairStreamTracker.class); + + /** + * "Denormalizes" (kind of the opposite of what Range.normalize does) the ranges in the keys of {{incoming}} + * + * It makes sure that if there is an intersection between {{range}} and some of the ranges in {{incoming.keySet()}} + * we know that all intersections are keys in the updated {{incoming}} + */ + public static Set<Range<Token>> denormalize(Range<Token> range, Map<Range<Token>, StreamFromOptions> incoming) + { + logger.trace("Denormalizing range={} incoming={}", range, incoming); + Set<Range<Token>> existingRanges = new HashSet<>(incoming.keySet()); + Map<Range<Token>, StreamFromOptions> existingOverlappingRanges = new HashMap<>(); + // remove all overlapping ranges from the incoming map + for (Range<Token> existingRange : existingRanges) + { + if (range.intersects(existingRange)) + existingOverlappingRanges.put(existingRange, incoming.remove(existingRange)); + } + + Set<Range<Token>> intersections = intersection(existingRanges, range); + Set<Range<Token>> newExisting = Sets.union(subtractFromAllRanges(existingOverlappingRanges.keySet(), range), intersections); + Set<Range<Token>> newInput = Sets.union(range.subtractAll(existingOverlappingRanges.keySet()), intersections); + assertNonOverLapping(newExisting); + assertNonOverLapping(newInput); + for (Range<Token> r : newExisting) + { + for (Map.Entry<Range<Token>, StreamFromOptions> entry : existingOverlappingRanges.entrySet()) + { + if (r.intersects(entry.getKey())) + incoming.put(r, entry.getValue().copy(r)); + } + } + logger.trace("denormalized {} to {}", range, newInput); + logger.trace("denormalized incoming to {}", incoming); + assertNonOverLapping(incoming.keySet()); + return newInput; + } + + /** + * Subtract the given range from all the input ranges. + * + * for example: + * ranges = [(0, 10], (20, 30]] + * and range = (8, 22] + * + * the result should be [(0, 8], (22, 30]] + * + */ + @VisibleForTesting + static Set<Range<Token>> subtractFromAllRanges(Collection<Range<Token>> ranges, Range<Token> range) + { + Set<Range<Token>> result = new HashSet<>(); + for (Range<Token> r : ranges) + result.addAll(r.subtract(range)); // subtract can return two ranges if we remove the middle part + return result; + } + + /** + * Makes sure non of the input ranges are overlapping + */ + private static void assertNonOverLapping(Set<Range<Token>> ranges) + { + List<Range<Token>> sortedRanges = Range.sort(ranges); + Token lastToken = null; + for (Range<Token> range : sortedRanges) + { + if (lastToken != null && lastToken.compareTo(range.left) > 0) + { + throw new AssertionError("Ranges are overlapping: "+ranges); + } + lastToken = range.right; + } + } + + /** + * Returns all intersections between the ranges in ranges and the given range + */ + private static Set<Range<Token>> intersection(Collection<Range<Token>> ranges, Range<Token> range) + { + Set<Range<Token>> result = new HashSet<>(); + for (Range<Token> r : ranges) + result.addAll(range.intersectionWith(r)); + return result; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java new file mode 100644 index 0000000..ce05e93 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/asymmetric/ReduceHelper.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.asymmetric; + +import java.net.InetAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +/** + * Basic idea is that we track incoming ranges instead of blindly just exchanging the ranges that mismatch between two nodes + * + * Say node X has tracked that it will stream range r1 from node Y. Now we see find a diffing range + * r1 between node X and Z. When adding r1 from Z as an incoming to X we check if Y and Z are equal on range r (ie, there is + * no difference between them). If they are equal X can stream from Y or Z and the end result will be the same. + * + * The ranges wont match perfectly since we don't iterate over leaves so we always split based on the + * smallest range (either the new difference or the existing one) + */ +public class ReduceHelper +{ + /** + * Reduces the differences provided by the merkle trees to a minimum set of differences + */ + public static ImmutableMap<InetAddress, HostDifferences> reduce(DifferenceHolder differences, PreferedNodeFilter filter) + { + Map<InetAddress, IncomingRepairStreamTracker> trackers = createIncomingRepairStreamTrackers(differences); + Map<InetAddress, Integer> outgoingStreamCounts = new HashMap<>(); + ImmutableMap.Builder<InetAddress, HostDifferences> mapBuilder = ImmutableMap.builder(); + for (Map.Entry<InetAddress, IncomingRepairStreamTracker> trackerEntry : trackers.entrySet()) + { + IncomingRepairStreamTracker tracker = trackerEntry.getValue(); + HostDifferences rangesToFetch = new HostDifferences(); + for (Map.Entry<Range<Token>, StreamFromOptions> entry : tracker.getIncoming().entrySet()) + { + Range<Token> rangeToFetch = entry.getKey(); + for (InetAddress remoteNode : pickLeastStreaming(trackerEntry.getKey(), entry.getValue(), outgoingStreamCounts, filter)) + rangesToFetch.addSingleRange(remoteNode, rangeToFetch); + } + mapBuilder.put(trackerEntry.getKey(), rangesToFetch); + } + + return mapBuilder.build(); + } + + @VisibleForTesting + static Map<InetAddress, IncomingRepairStreamTracker> createIncomingRepairStreamTrackers(DifferenceHolder differences) + { + Map<InetAddress, IncomingRepairStreamTracker> trackers = new HashMap<>(); + + for (InetAddress hostWithDifference : differences.keyHosts()) + { + HostDifferences hostDifferences = differences.get(hostWithDifference); + for (InetAddress differingHost : hostDifferences.hosts()) + { + List<Range<Token>> differingRanges = hostDifferences.get(differingHost); + // hostWithDifference has mismatching ranges differingRanges with differingHost: + for (Range<Token> range : differingRanges) + { + // a difference means that we need to sync that range between two nodes - add the diffing range to both + // hosts: + getTracker(differences, trackers, hostWithDifference).addIncomingRangeFrom(range, differingHost); + getTracker(differences, trackers, differingHost).addIncomingRangeFrom(range, hostWithDifference); + } + } + } + return trackers; + } + + private static IncomingRepairStreamTracker getTracker(DifferenceHolder differences, + Map<InetAddress, IncomingRepairStreamTracker> trackers, + InetAddress host) + { + return trackers.computeIfAbsent(host, (h) -> new IncomingRepairStreamTracker(differences)); + } + + // greedily pick the nodes doing the least amount of streaming + private static Collection<InetAddress> pickLeastStreaming(InetAddress streamingNode, + StreamFromOptions toStreamFrom, + Map<InetAddress, Integer> outgoingStreamCounts, + PreferedNodeFilter filter) + { + Set<InetAddress> retSet = new HashSet<>(); + for (Set<InetAddress> toStream : toStreamFrom.allStreams()) + { + InetAddress candidate = null; + Set<InetAddress> prefered = filter.apply(streamingNode, toStream); + for (InetAddress node : prefered) + { + if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0)) + { + candidate = node; + } + } + // ok, found no prefered hosts, try all of them + if (candidate == null) + { + for (InetAddress node : toStream) + { + if (candidate == null || outgoingStreamCounts.getOrDefault(candidate, 0) > outgoingStreamCounts.getOrDefault(node, 0)) + { + candidate = node; + } + } + } + assert candidate != null; + outgoingStreamCounts.put(candidate, outgoingStreamCounts.getOrDefault(candidate, 0) + 1); + retSet.add(candidate); + } + return retSet; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java new file mode 100644 index 0000000..4516f23 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/asymmetric/StreamFromOptions.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair.asymmetric; + +import java.net.InetAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +/** + * Keeps track of where a node needs to stream a given range from. + * + * If the remote range is identical on several remote nodes, this class keeps track of them + * + * These stream from options get 'split' during denormalization - for example if we track range + * (100, 200] and we find a new differing range (180, 200] - then the denormalization will create two + * new StreamFromOptions (see copy below) with the same streamOptions, one with range (100, 180] and one with (180, 200] - then it + * adds the new incoming difference to the StreamFromOptions for the new range (180, 200]. + */ +public class StreamFromOptions +{ + /** + * all differences - used to figure out if two nodes are equals on the range + */ + private final DifferenceHolder differences; + /** + * The range to stream + */ + @VisibleForTesting + final Range<Token> range; + /** + * Contains the hosts to stream from - if two nodes are in the same inner set, they are identical for the range we are handling + */ + private final Set<Set<InetAddress>> streamOptions = new HashSet<>(); + + public StreamFromOptions(DifferenceHolder differences, Range<Token> range) + { + this(differences, range, Collections.emptySet()); + } + + private StreamFromOptions(DifferenceHolder differences, Range<Token> range, Set<Set<InetAddress>> existing) + { + this.differences = differences; + this.range = range; + for (Set<InetAddress> addresses : existing) + this.streamOptions.add(Sets.newHashSet(addresses)); + } + + /** + * Add new node to the stream options + * + * If we have no difference between the new node and a currently tracked on, we know they are matching over the + * range we are tracking, then just add it to the set with the identical remote nodes. Otherwise create a new group + * of nodes containing this new node. + */ + public void add(InetAddress streamFromNode) + { + for (Set<InetAddress> options : streamOptions) + { + InetAddress first = options.iterator().next(); + if (!differences.hasDifferenceBetween(first, streamFromNode, range)) + { + options.add(streamFromNode); + return; + } + } + streamOptions.add(Sets.newHashSet(streamFromNode)); + } + + public StreamFromOptions copy(Range<Token> withRange) + { + return new StreamFromOptions(differences, withRange, streamOptions); + } + + public Iterable<Set<InetAddress>> allStreams() + { + return streamOptions; + } + + public String toString() + { + return "StreamFromOptions{" + + ", range=" + range + + ", streamOptions=" + streamOptions + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java new file mode 100644 index 0000000..b75ad7f --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/AsymmetricSyncRequest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair.messages; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.CompactEndpointSerializationHelper; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.streaming.PreviewKind; + +public class AsymmetricSyncRequest extends RepairMessage +{ + public static MessageSerializer serializer = new SyncRequestSerializer(); + + public final InetAddress initiator; + public final InetAddress fetchingNode; + public final InetAddress fetchFrom; + public final Collection<Range<Token>> ranges; + public final PreviewKind previewKind; + + public AsymmetricSyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress fetchingNode, InetAddress fetchFrom, Collection<Range<Token>> ranges, PreviewKind previewKind) + { + super(Type.ASYMMETRIC_SYNC_REQUEST, desc); + this.initiator = initiator; + this.fetchingNode = fetchingNode; + this.fetchFrom = fetchFrom; + this.ranges = ranges; + this.previewKind = previewKind; + } + + @Override + public boolean equals(Object o) + { + if (!(o instanceof AsymmetricSyncRequest)) + return false; + AsymmetricSyncRequest req = (AsymmetricSyncRequest)o; + return messageType == req.messageType && + desc.equals(req.desc) && + initiator.equals(req.initiator) && + fetchingNode.equals(req.fetchingNode) && + fetchFrom.equals(req.fetchFrom) && + ranges.equals(req.ranges); + } + + @Override + public int hashCode() + { + return Objects.hash(messageType, desc, initiator, fetchingNode, fetchFrom, ranges); + } + + public static class SyncRequestSerializer implements MessageSerializer<AsymmetricSyncRequest> + { + public void serialize(AsymmetricSyncRequest message, DataOutputPlus out, int version) throws IOException + { + RepairJobDesc.serializer.serialize(message.desc, out, version); + CompactEndpointSerializationHelper.serialize(message.initiator, out); + CompactEndpointSerializationHelper.serialize(message.fetchingNode, out); + CompactEndpointSerializationHelper.serialize(message.fetchFrom, out); + out.writeInt(message.ranges.size()); + for (Range<Token> range : message.ranges) + { + MessagingService.validatePartitioner(range); + AbstractBounds.tokenSerializer.serialize(range, out, version); + } + out.writeInt(message.previewKind.getSerializationVal()); + } + + public AsymmetricSyncRequest deserialize(DataInputPlus in, int version) throws IOException + { + RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); + InetAddress owner = CompactEndpointSerializationHelper.deserialize(in); + InetAddress src = CompactEndpointSerializationHelper.deserialize(in); + InetAddress dst = CompactEndpointSerializationHelper.deserialize(in); + int rangesCount = in.readInt(); + List<Range<Token>> ranges = new ArrayList<>(rangesCount); + for (int i = 0; i < rangesCount; ++i) + ranges.add((Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version)); + PreviewKind previewKind = PreviewKind.deserialize(in.readInt()); + return new AsymmetricSyncRequest(desc, owner, src, dst, ranges, previewKind); + } + + public long serializedSize(AsymmetricSyncRequest message, int version) + { + long size = RepairJobDesc.serializer.serializedSize(message.desc, version); + size += 3 * CompactEndpointSerializationHelper.serializedSize(message.initiator); + size += TypeSizes.sizeof(message.ranges.size()); + for (Range<Token> range : message.ranges) + size += AbstractBounds.tokenSerializer.serializedSize(range, version); + size += TypeSizes.sizeof(message.previewKind.getSerializationVal()); + return size; + } + } + + public String toString() + { + return "AsymmetricSyncRequest{" + + "initiator=" + initiator + + ", fetchingNode=" + fetchingNode + + ", fetchFrom=" + fetchFrom + + ", ranges=" + ranges + + ", previewKind=" + previewKind + + ", desc="+desc+ + '}'; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/messages/RepairMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java index b72f139..09c6060 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java @@ -60,7 +60,8 @@ public abstract class RepairMessage FINALIZE_COMMIT(12, FinalizeCommit.serializer), FAILED_SESSION(13, FailSession.serializer), STATUS_REQUEST(14, StatusRequest.serializer), - STATUS_RESPONSE(15, StatusResponse.serializer); + STATUS_RESPONSE(15, StatusResponse.serializer), + ASYMMETRIC_SYNC_REQUEST(16, AsymmetricSyncRequest.serializer); private final byte type; private final MessageSerializer<RepairMessage> serializer; http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/repair/messages/RepairOption.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java index 971bf5d..adcd776 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -50,6 +50,7 @@ public class RepairOption public static final String PULL_REPAIR_KEY = "pullRepair"; public static final String FORCE_REPAIR_KEY = "forceRepair"; public static final String PREVIEW = "previewKind"; + public static final String OPTIMISE_STREAMS_KEY = "optimiseStreams"; // we don't want to push nodes too much for repair public static final int MAX_JOB_THREADS = 4; @@ -131,6 +132,12 @@ public class RepairOption * <td>"true" if the repair should continue, even if one of the replicas involved is down. * <td>false</td> * </tr> + * <tr> + * <td>optimiseStreams</td> + * <td>"true" if we should try to optimise the syncing to avoid transfering identical + * ranges to the same host multiple times</td> + * <td>false</td> + * </tr> * </tbody> * </table> * @@ -180,8 +187,9 @@ public class RepairOption ranges.add(new Range<>(parsedBeginToken, parsedEndToken)); } } + boolean asymmetricSyncing = Boolean.parseBoolean(options.get(OPTIMISE_STREAMS_KEY)); - RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind); + RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind, asymmetricSyncing); // data centers String dataCentersStr = options.get(DATACENTERS_KEY); @@ -259,13 +267,14 @@ public class RepairOption private final boolean pullRepair; private final boolean forceRepair; private final PreviewKind previewKind; + private final boolean optimiseStreams; private final Collection<String> columnFamilies = new HashSet<>(); private final Collection<String> dataCenters = new HashSet<>(); private final Collection<String> hosts = new HashSet<>(); private final Collection<Range<Token>> ranges = new HashSet<>(); - public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, PreviewKind previewKind) + public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, PreviewKind previewKind, boolean optimiseStreams) { if (FBUtilities.isWindows && (DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) && @@ -286,6 +295,7 @@ public class RepairOption this.pullRepair = pullRepair; this.forceRepair = forceRepair; this.previewKind = previewKind; + this.optimiseStreams = optimiseStreams; } public RepairParallelism getParallelism() @@ -363,10 +373,16 @@ public class RepairOption return previewKind.isPreview(); } - public boolean isInLocalDCOnly() { + public boolean isInLocalDCOnly() + { return dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()); } + public boolean optimiseStreams() + { + return optimiseStreams; + } + @Override public String toString() { @@ -382,6 +398,7 @@ public class RepairOption ", # of ranges: " + ranges.size() + ", pull repair: " + pullRepair + ", force repair: " + forceRepair + + ", optimise streams: "+ optimiseStreams + ')'; } @@ -401,6 +418,7 @@ public class RepairOption options.put(PULL_REPAIR_KEY, Boolean.toString(pullRepair)); options.put(FORCE_REPAIR_KEY, Boolean.toString(forceRepair)); options.put(PREVIEW, previewKind.toString()); + options.put(OPTIMISE_STREAMS_KEY, Boolean.toString(optimiseStreams)); return options; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index ef3ffeb..0276238 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -213,6 +213,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai boolean pullRepair, boolean force, PreviewKind previewKind, + boolean optimiseStreams, ListeningExecutorService executor, String... cfnames) { @@ -222,7 +223,8 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai if (cfnames.length == 0) return null; - final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isIncremental, pullRepair, force, previewKind, cfnames); + + final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isIncremental, pullRepair, force, previewKind, optimiseStreams, cfnames); sessions.put(session.getId(), session); // register listeners http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/src/java/org/apache/cassandra/tools/nodetool/Repair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java index 86c29d4..8347afc 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java @@ -94,6 +94,10 @@ public class Repair extends NodeToolCmd @Option(title = "pull_repair", name = {"-pl", "--pull"}, description = "Use --pull to perform a one way repair where data is only streamed from a remote node to this node.") private boolean pullRepair = false; + @Option(title = "optimise_streams", name = {"-os", "--optimise-streams"}, description = "Use --optimise-streams to try to reduce the number of streams we do (EXPERIMENTAL, see CASSANDRA-3200).") + private boolean optimiseStreams = false; + + private PreviewKind getPreviewKind() { if (validate) @@ -144,7 +148,7 @@ public class Repair extends NodeToolCmd options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(pullRepair)); options.put(RepairOption.FORCE_REPAIR_KEY, Boolean.toString(force)); options.put(RepairOption.PREVIEW, getPreviewKind().toString()); - + options.put(RepairOption.OPTIMISE_STREAMS_KEY, Boolean.toString(optimiseStreams)); if (!startToken.isEmpty() || !endToken.isEmpty()) { options.put(RepairOption.RANGES_KEY, startToken + ":" + endToken); http://git-wip-us.apache.org/repos/asf/cassandra/blob/cb56d9fc/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index f5e9d6b..7f3dbff 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.repair; import java.net.InetAddress; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.UUID; @@ -46,7 +47,6 @@ import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.UUIDGen; -import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; import static org.junit.Assert.assertEquals; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org