bdeggleston commented on code in PR #4428:
URL: https://github.com/apache/cassandra/pull/4428#discussion_r2433588094
##########
src/java/org/apache/cassandra/replication/MutationTrackingService.java:
##########
@@ -915,6 +978,62 @@ static List<KeyspaceShards>
loadFromSystemTables(ClusterMetadata cluster, LongSu
}
}
+ private static class CoordinatedTransfers implements
Iterable<CoordinatedTransfer>
Review Comment:
I don't think this needs to be a private inner class. It could also use some
unit tests checking the sstable selection logic
##########
src/java/org/apache/cassandra/replication/MutationTrackingService.java:
##########
@@ -915,6 +978,62 @@ static List<KeyspaceShards>
loadFromSystemTables(ClusterMetadata cluster, LongSu
}
}
+ private static class CoordinatedTransfers implements
Iterable<CoordinatedTransfer>
+ {
+ private final Collection<CoordinatedTransfer> transfers;
+
+ private CoordinatedTransfers(Collection<CoordinatedTransfer> transfers)
+ {
+ this.transfers = transfers;
+ }
+
+ private static CoordinatedTransfers create(KeyspaceShards shards,
Collection<SSTableReader> sstables, ConsistencyLevel cl)
+ {
+ // Clean up incoming SSTables to remove any existing
CoordinatorLogOffsets, can't be trusted
+ for (SSTableReader sstable : sstables)
+ {
+ try
+ {
+
sstable.mutateCoordinatorLogOffsetsAndReload(ImmutableCoordinatorLogOffsets.NONE);
Review Comment:
if we loosen up validation on the stream receiving side, we can remove this
step, which would make it safe for use with failure recovery
##########
src/java/org/apache/cassandra/db/ReadExecutionController.java:
##########
@@ -243,4 +261,20 @@ private void addSample()
if (cfs != null)
cfs.metric.topLocalReadQueryTime.addSample(cql, timeMicros);
}
+
+ public void addActivationIds(ColumnFamilyStore.ViewFragment view)
+ {
+ activationIds = new HashSet<>();
+ for (SSTableReader sstable : view.sstables)
Review Comment:
we should also validate that the table is part of a tracked keyspace
##########
src/java/org/apache/cassandra/replication/LocalTransfers.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.utils.TimeUUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+
+/**
+ * Stores coordinated and received transfers.
+ *
+ * TODO: Make changes to pending set durable with
SystemKeyspace.savePendingLocalTransfer(transfer)?
+ * TODO: GC
+ */
+class LocalTransfers
Review Comment:
this some unit tests putting it through it paces and confirming everything
works as expected
##########
src/java/org/apache/cassandra/replication/UnreconciledTransfers.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.TableId;
+
+/**
+ * See {@link UnreconciledMutations}.
+ *
+ * For now, all reads intersect with all transfers, but we could be more
discerning and only return transfers ƒor the
Review Comment:
how does that work with larger clusters where each node replicates it's own
subset of ranges? Wouldn't every read (potentially) contain transfer ids that
are for ranges that the other read participants don't replicate and would
therefore not have in their own set of transfer ids?
##########
src/java/org/apache/cassandra/replication/CoordinatedTransfer.java:
##########
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RequestFailure;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.RequestCallbackWithFailure;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.concurrent.AsyncFuture;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+
+/**
+ * A tracked bulk transfer for a single replica set.
+ * <p>
+ * For simplicity, streaming from coordinator to itself instead of copying
files. This has some perks:
+ * (1) it allows us to import out-of-range SSTables using the same paths, and
+ * (2) it uses the existing lifecycle management to handle crash-safety, so
don't need to deal with atomic multi-file
+ * copy.
+ * <p>
+ * A transfer happens in a few steps. First, the coordinator streams the
SSTables to each replica. Replicas store the
+ * streamed transfer in a "pending" location on the filesystem, where it isn't
visible to reads. Once the coordinator
+ * receives acknowledgements of completed streams from sufficient replicas,
the coordinator assigns an "activation ID"
+ * for the transfer, and notifies replicas that the pending stream has been
activated with that ID. Replicas then move
+ * the pending SSTables into the live set, where they're visible to reads, and
include the "activation ID" in mutation
+ * tracking summaries for reads that would include the new SSTables.
+ */
+public class CoordinatedTransfer
+{
+ private static final Logger logger =
LoggerFactory.getLogger(CoordinatedTransfer.class);
+
+ String logPrefix()
+ {
+ return String.format("[CoordinatedTransfer #%s]", transferId);
+ }
+
+ final TimeUUID transferId = TimeUUID.Generator.nextTimeUUID();
+
+ // TODO(expected): Add epoch at time of creation
+ final String keyspace;
+ public final Range<Token> range;
+
+ final ConcurrentMap<InetAddressAndPort, SingleTransferResult> streams;
+
+ final Collection<SSTableReader> sstables;
+
+ final ConsistencyLevel cl;
+
+ final Supplier<MutationId> getActivationId;
+ volatile MutationId activationId = null;
+
+ CoordinatedTransfer(String keyspace, Range<Token> range, Participants
participants, Collection<SSTableReader> sstables, ConsistencyLevel cl,
Supplier<MutationId> getActivationId)
+ {
+ this.keyspace = keyspace;
+ this.range = range;
+ this.sstables = sstables;
+ this.cl = cl;
+ this.getActivationId = getActivationId;
+
+ ClusterMetadata cm = ClusterMetadata.current();
+ this.streams = new ConcurrentHashMap<>(participants.size());
+ for (int i = 0; i < participants.size(); i++)
+ {
+ InetAddressAndPort addr = cm.directory.getNodeAddresses(new
NodeId(participants.get(i))).broadcastAddress;
+ this.streams.put(addr, SingleTransferResult.Unknown());
+ }
+ }
+
+ void execute()
+ {
+ logger.debug("Executing tracked bulk transfer {}", this);
+ LocalTransfers.instance().save(this);
+ stream();
+ }
+
+ private void stream()
+ {
+ // TODO: Don't stream multiple copies over the WAN, send one copy and
indicate forwarding
+ List<Future<Void>> streaming = new ArrayList<>(streams.size());
+ for (InetAddressAndPort to : streams.keySet())
+ streaming.add(stream(to));
+
+ Future<List<Void>> future = FutureCombiner.allOf(streaming);
+ future.awaitUninterruptibly();
+ future.rethrowIfFailed();
+ }
+
+ private boolean sufficient()
+ {
+ AbstractReplicationStrategy ars =
Keyspace.open(keyspace).getReplicationStrategy();
+ int blockFor = cl.blockFor(ars);
+ int responses = 0;
+ for (Map.Entry<InetAddressAndPort, SingleTransferResult> entry :
streams.entrySet())
+ {
+ if (entry.getValue().state ==
SingleTransferResult.State.STREAM_COMPLETE)
+ responses++;
+ }
+ return responses >= blockFor;
+ }
+
+ Future<Void> stream(InetAddressAndPort to)
+ {
+ return streamTask(to).andThenAsync(result -> streamComplete(to,
result));
+ }
+
+ private Future<Void> streamComplete(InetAddressAndPort to,
SingleTransferResult result)
+ {
+ streams.put(to, result);
+ logger.info("{} Completed streaming to {}, {}", logPrefix(), to, this);
+ return maybeActivate();
+ }
+
+ synchronized Future<Void> maybeActivate()
+ {
+ // If any activations have already been sent out, send new activations
to any received plans that have not yet
+ // been activated
+ boolean anyActivated = false;
+ Set<InetAddressAndPort> awaitingActivation = new HashSet<>();
+ for (Map.Entry<InetAddressAndPort, SingleTransferResult> entry :
streams.entrySet())
+ {
+ InetAddressAndPort peer = entry.getKey();
+ SingleTransferResult result = entry.getValue();
+ if (result.state == SingleTransferResult.State.ACTIVATE_COMPLETE)
+ {
+ anyActivated = true;
+ }
+ else if (result.state ==
SingleTransferResult.State.STREAM_COMPLETE)
+ awaitingActivation.add(peer);
+ }
+ if (anyActivated && !awaitingActivation.isEmpty())
+ {
+ logger.debug("{} Transfer already activated on peers, sending
activations to {}", logPrefix(), awaitingActivation);
+ return activateOn(awaitingActivation);
+ }
+
+ // If no activations have been sent out, check whether we have enough
planIds back to meet the required CL
+ else if (sufficient())
+ {
+ Set<InetAddressAndPort> peers = new HashSet<>();
+ for (Map.Entry<InetAddressAndPort, SingleTransferResult> entry :
streams.entrySet())
+ {
+ InetAddressAndPort peer = entry.getKey();
+ SingleTransferResult result = entry.getValue();
+ if (result.state == SingleTransferResult.State.STREAM_COMPLETE)
+ peers.add(peer);
+ }
+ logger.debug("{} Transfer meets consistency level {}, sending
activations to {}", logPrefix(), cl, peers);
+ return activateOn(peers);
+ }
+
+ logger.debug("Nothing to activate");
+ return ImmediateFuture.success(null);
+ }
+
+ synchronized Future<Void> activateOn(Collection<InetAddressAndPort> peers)
+ {
+ Preconditions.checkState(!peers.isEmpty());
+
+ if (activationId == null)
+ {
+ activationId = getActivationId.get();
+ logger.info("{} Assigned activationId {}", logPrefix(),
activationId);
+ }
+ LocalTransfers.instance().activating(this);
+
+ // First phase is dryRun to ensure data is present on disk, then
second phase does the actual import. This
+ // ensures that if something goes wrong (like a topology change during
import), we don't have divergence.
+ class DryRun extends AsyncFuture<Void> implements
RequestCallbackWithFailure<NoPayload>
Review Comment:
Let's stick with standard 2 phase commit protocol names for these steps.
This would be the prepare phase, and the next step is the commit phase
##########
src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java:
##########
@@ -256,6 +258,15 @@ public void finished()
// add sstables (this will build non-SSTable-attached
secondary indexes too, see CASSANDRA-10130)
logger.debug("[Stream #{}] Received {} sstables from {} ({})",
session.planId(), readers.size(), session.peer, readers);
+
+ if (cfs.metadata().replicationType().isTracked())
Review Comment:
as mentioned elsewhere, we can't require all tracked streams to use bulk
transfer
##########
src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java:
##########
@@ -180,15 +184,29 @@ protected SerializationHeader getHeader(TableMetadata
metadata) throws UnknownCo
}
protected SSTableTxnSingleStreamWriter createWriter(ColumnFamilyStore cfs,
long totalSize, long repairedAt, TimeUUID pendingRepair,
ImmutableCoordinatorLogOffsets coordinatorLogOffsets, SSTableFormat<?, ?>
format) throws IOException
{
+ boolean isTracked = cfs.metadata().replicationType().isTracked();
+
Directories.DataDirectory localDir =
cfs.getDirectories().getWriteableLocation(totalSize);
if (localDir == null)
throw new IOException(String.format("Insufficient disk space to
store %s", FBUtilities.prettyPrintMemory(totalSize)));
StreamReceiver streamReceiver = session.getAggregator(tableId);
Preconditions.checkState(streamReceiver instanceof
CassandraStreamReceiver);
ILifecycleTransaction txn = createTxn();
- RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs,
estimatedKeys, repairedAt, pendingRepair, coordinatorLogOffsets, format,
sstableLevel, totalSize, txn, getHeader(cfs.metadata()));
- return new SSTableTxnSingleStreamWriter(txn, writer);
+ if (isTracked)
Review Comment:
Whether the streamed sstables are cordoned off for bulk tracking or added
via the normal stream path should be dictated by the senders session info and
the stream init message. I don't think we want an explicit add step for
bootstrapped sstables etc.
##########
src/java/org/apache/cassandra/replication/ActiveLogReconciler.java:
##########
@@ -156,6 +176,34 @@ void send()
}
}
+ private static final class TransferTask extends Task
+ {
+ private final CoordinatedTransfer transfer;
+ private final InetAddressAndPort toHost;
+
+ TransferTask(CoordinatedTransfer transfer, InetAddressAndPort toHost)
+ {
+ this.transfer = transfer;
+ this.toHost = toHost;
+ }
+
+ @Override
+ public void onResponse(Message<NoPayload> msg)
+ {
+ logger.debug("Received activation for TransferTask from {}",
toHost);
+ MutationTrackingService.instance.receivedActivationAck(transfer,
toHost);
+ }
+
Review Comment:
we're missing an onFailure method here - it looks like we're also missing a
retryFailedTransferActivation method
##########
src/java/org/apache/cassandra/replication/CoordinatedTransfer.java:
##########
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RequestFailure;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.RequestCallbackWithFailure;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.concurrent.AsyncFuture;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+
+/**
+ * A tracked bulk transfer for a single replica set.
+ * <p>
+ * For simplicity, streaming from coordinator to itself instead of copying
files. This has some perks:
+ * (1) it allows us to import out-of-range SSTables using the same paths, and
+ * (2) it uses the existing lifecycle management to handle crash-safety, so
don't need to deal with atomic multi-file
+ * copy.
+ * <p>
+ * A transfer happens in a few steps. First, the coordinator streams the
SSTables to each replica. Replicas store the
+ * streamed transfer in a "pending" location on the filesystem, where it isn't
visible to reads. Once the coordinator
+ * receives acknowledgements of completed streams from sufficient replicas,
the coordinator assigns an "activation ID"
+ * for the transfer, and notifies replicas that the pending stream has been
activated with that ID. Replicas then move
+ * the pending SSTables into the live set, where they're visible to reads, and
include the "activation ID" in mutation
+ * tracking summaries for reads that would include the new SSTables.
+ */
+public class CoordinatedTransfer
+{
+ private static final Logger logger =
LoggerFactory.getLogger(CoordinatedTransfer.class);
+
+ String logPrefix()
+ {
+ return String.format("[CoordinatedTransfer #%s]", transferId);
+ }
+
+ final TimeUUID transferId = TimeUUID.Generator.nextTimeUUID();
+
+ // TODO(expected): Add epoch at time of creation
+ final String keyspace;
+ public final Range<Token> range;
+
+ final ConcurrentMap<InetAddressAndPort, SingleTransferResult> streams;
+
+ final Collection<SSTableReader> sstables;
+
+ final ConsistencyLevel cl;
+
+ final Supplier<MutationId> getActivationId;
+ volatile MutationId activationId = null;
+
+ CoordinatedTransfer(String keyspace, Range<Token> range, Participants
participants, Collection<SSTableReader> sstables, ConsistencyLevel cl,
Supplier<MutationId> getActivationId)
+ {
+ this.keyspace = keyspace;
+ this.range = range;
+ this.sstables = sstables;
+ this.cl = cl;
+ this.getActivationId = getActivationId;
+
+ ClusterMetadata cm = ClusterMetadata.current();
+ this.streams = new ConcurrentHashMap<>(participants.size());
+ for (int i = 0; i < participants.size(); i++)
+ {
+ InetAddressAndPort addr = cm.directory.getNodeAddresses(new
NodeId(participants.get(i))).broadcastAddress;
+ this.streams.put(addr, SingleTransferResult.Unknown());
+ }
+ }
+
+ void execute()
+ {
+ logger.debug("Executing tracked bulk transfer {}", this);
+ LocalTransfers.instance().save(this);
+ stream();
+ }
+
+ private void stream()
+ {
+ // TODO: Don't stream multiple copies over the WAN, send one copy and
indicate forwarding
+ List<Future<Void>> streaming = new ArrayList<>(streams.size());
+ for (InetAddressAndPort to : streams.keySet())
+ streaming.add(stream(to));
+
+ Future<List<Void>> future = FutureCombiner.allOf(streaming);
+ future.awaitUninterruptibly();
+ future.rethrowIfFailed();
+ }
+
+ private boolean sufficient()
+ {
+ AbstractReplicationStrategy ars =
Keyspace.open(keyspace).getReplicationStrategy();
+ int blockFor = cl.blockFor(ars);
+ int responses = 0;
+ for (Map.Entry<InetAddressAndPort, SingleTransferResult> entry :
streams.entrySet())
+ {
+ if (entry.getValue().state ==
SingleTransferResult.State.STREAM_COMPLETE)
+ responses++;
+ }
+ return responses >= blockFor;
+ }
+
+ Future<Void> stream(InetAddressAndPort to)
+ {
+ return streamTask(to).andThenAsync(result -> streamComplete(to,
result));
+ }
+
+ private Future<Void> streamComplete(InetAddressAndPort to,
SingleTransferResult result)
+ {
+ streams.put(to, result);
+ logger.info("{} Completed streaming to {}, {}", logPrefix(), to, this);
+ return maybeActivate();
+ }
+
+ synchronized Future<Void> maybeActivate()
+ {
+ // If any activations have already been sent out, send new activations
to any received plans that have not yet
+ // been activated
+ boolean anyActivated = false;
+ Set<InetAddressAndPort> awaitingActivation = new HashSet<>();
+ for (Map.Entry<InetAddressAndPort, SingleTransferResult> entry :
streams.entrySet())
+ {
+ InetAddressAndPort peer = entry.getKey();
+ SingleTransferResult result = entry.getValue();
+ if (result.state == SingleTransferResult.State.ACTIVATE_COMPLETE)
+ {
+ anyActivated = true;
+ }
+ else if (result.state ==
SingleTransferResult.State.STREAM_COMPLETE)
+ awaitingActivation.add(peer);
+ }
+ if (anyActivated && !awaitingActivation.isEmpty())
+ {
+ logger.debug("{} Transfer already activated on peers, sending
activations to {}", logPrefix(), awaitingActivation);
+ return activateOn(awaitingActivation);
+ }
+
+ // If no activations have been sent out, check whether we have enough
planIds back to meet the required CL
+ else if (sufficient())
+ {
+ Set<InetAddressAndPort> peers = new HashSet<>();
+ for (Map.Entry<InetAddressAndPort, SingleTransferResult> entry :
streams.entrySet())
+ {
+ InetAddressAndPort peer = entry.getKey();
+ SingleTransferResult result = entry.getValue();
+ if (result.state == SingleTransferResult.State.STREAM_COMPLETE)
+ peers.add(peer);
+ }
+ logger.debug("{} Transfer meets consistency level {}, sending
activations to {}", logPrefix(), cl, peers);
+ return activateOn(peers);
+ }
+
+ logger.debug("Nothing to activate");
+ return ImmediateFuture.success(null);
+ }
+
+ synchronized Future<Void> activateOn(Collection<InetAddressAndPort> peers)
+ {
+ Preconditions.checkState(!peers.isEmpty());
+
+ if (activationId == null)
+ {
+ activationId = getActivationId.get();
+ logger.info("{} Assigned activationId {}", logPrefix(),
activationId);
+ }
+ LocalTransfers.instance().activating(this);
+
+ // First phase is dryRun to ensure data is present on disk, then
second phase does the actual import. This
+ // ensures that if something goes wrong (like a topology change during
import), we don't have divergence.
+ class DryRun extends AsyncFuture<Void> implements
RequestCallbackWithFailure<NoPayload>
+ {
+ final Set<InetAddressAndPort> responses =
ConcurrentHashMap.newKeySet();
+
+ public DryRun()
+ {
+ responses.addAll(peers);
+ }
+
+ @Override
+ public void onResponse(Message<NoPayload> msg)
+ {
+ logger.debug("{} Got response from: {}", logPrefix(),
msg.from());
+ responses.remove(msg.from());
+ if (responses.isEmpty())
+ trySuccess(null);
+ }
+
+ @Override
+ public void onFailure(InetAddressAndPort from, RequestFailure
failure)
+ {
+ logger.debug("{} Got failure {} from {}", logPrefix(),
failure, from);
+ tryFailure(null);
+ }
+ }
+
+ DryRun dryRun = new DryRun();
+ for (InetAddressAndPort peer : peers)
+ {
+ TransferActivation activation = new TransferActivation(this, peer,
true);
+ Message<TransferActivation> msg =
Message.out(Verb.TRACKED_TRANSFER_ACTIVATE_REQ, activation);
+ logger.debug("{} Sending {} to peer {}", logPrefix(), activation,
peer);
+ MessagingService.instance().sendWithCallback(msg, peer, dryRun);
+ SingleTransferResult result =
CoordinatedTransfer.this.streams.get(msg.from());
+ if (result != null)
+ result.sentActivation();
+ }
+ dryRun.awaitUninterruptibly();
Review Comment:
I don't think this method will throw an exception if there's a failure.
Assuming I'm not mistaken about that, it means that we need to check if dryRun
failed before continuing, and that there's a testing gap wrt protocol failures
and this logic.
##########
src/java/org/apache/cassandra/replication/CoordinatedTransfer.java:
##########
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RequestFailure;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.RequestCallbackWithFailure;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.concurrent.AsyncFuture;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+
+/**
+ * A tracked bulk transfer for a single replica set.
+ * <p>
+ * For simplicity, streaming from coordinator to itself instead of copying
files. This has some perks:
+ * (1) it allows us to import out-of-range SSTables using the same paths, and
+ * (2) it uses the existing lifecycle management to handle crash-safety, so
don't need to deal with atomic multi-file
+ * copy.
+ * <p>
+ * A transfer happens in a few steps. First, the coordinator streams the
SSTables to each replica. Replicas store the
+ * streamed transfer in a "pending" location on the filesystem, where it isn't
visible to reads. Once the coordinator
+ * receives acknowledgements of completed streams from sufficient replicas,
the coordinator assigns an "activation ID"
+ * for the transfer, and notifies replicas that the pending stream has been
activated with that ID. Replicas then move
+ * the pending SSTables into the live set, where they're visible to reads, and
include the "activation ID" in mutation
+ * tracking summaries for reads that would include the new SSTables.
+ */
+public class CoordinatedTransfer
+{
+ private static final Logger logger =
LoggerFactory.getLogger(CoordinatedTransfer.class);
+
+ String logPrefix()
+ {
+ return String.format("[CoordinatedTransfer #%s]", transferId);
+ }
+
+ final TimeUUID transferId = TimeUUID.Generator.nextTimeUUID();
+
+ // TODO(expected): Add epoch at time of creation
+ final String keyspace;
+ public final Range<Token> range;
+
+ final ConcurrentMap<InetAddressAndPort, SingleTransferResult> streams;
+
+ final Collection<SSTableReader> sstables;
+
+ final ConsistencyLevel cl;
+
+ final Supplier<MutationId> getActivationId;
+ volatile MutationId activationId = null;
+
+ CoordinatedTransfer(String keyspace, Range<Token> range, Participants
participants, Collection<SSTableReader> sstables, ConsistencyLevel cl,
Supplier<MutationId> getActivationId)
+ {
+ this.keyspace = keyspace;
+ this.range = range;
+ this.sstables = sstables;
+ this.cl = cl;
+ this.getActivationId = getActivationId;
+
+ ClusterMetadata cm = ClusterMetadata.current();
+ this.streams = new ConcurrentHashMap<>(participants.size());
+ for (int i = 0; i < participants.size(); i++)
+ {
+ InetAddressAndPort addr = cm.directory.getNodeAddresses(new
NodeId(participants.get(i))).broadcastAddress;
+ this.streams.put(addr, SingleTransferResult.Unknown());
+ }
+ }
+
+ void execute()
+ {
+ logger.debug("Executing tracked bulk transfer {}", this);
+ LocalTransfers.instance().save(this);
+ stream();
+ }
+
+ private void stream()
+ {
+ // TODO: Don't stream multiple copies over the WAN, send one copy and
indicate forwarding
+ List<Future<Void>> streaming = new ArrayList<>(streams.size());
+ for (InetAddressAndPort to : streams.keySet())
+ streaming.add(stream(to));
+
+ Future<List<Void>> future = FutureCombiner.allOf(streaming);
+ future.awaitUninterruptibly();
+ future.rethrowIfFailed();
+ }
+
+ private boolean sufficient()
+ {
+ AbstractReplicationStrategy ars =
Keyspace.open(keyspace).getReplicationStrategy();
+ int blockFor = cl.blockFor(ars);
+ int responses = 0;
+ for (Map.Entry<InetAddressAndPort, SingleTransferResult> entry :
streams.entrySet())
+ {
+ if (entry.getValue().state ==
SingleTransferResult.State.STREAM_COMPLETE)
+ responses++;
+ }
+ return responses >= blockFor;
+ }
+
+ Future<Void> stream(InetAddressAndPort to)
+ {
+ return streamTask(to).andThenAsync(result -> streamComplete(to,
result));
+ }
+
+ private Future<Void> streamComplete(InetAddressAndPort to,
SingleTransferResult result)
+ {
+ streams.put(to, result);
+ logger.info("{} Completed streaming to {}, {}", logPrefix(), to, this);
+ return maybeActivate();
+ }
+
+ synchronized Future<Void> maybeActivate()
+ {
+ // If any activations have already been sent out, send new activations
to any received plans that have not yet
+ // been activated
+ boolean anyActivated = false;
+ Set<InetAddressAndPort> awaitingActivation = new HashSet<>();
+ for (Map.Entry<InetAddressAndPort, SingleTransferResult> entry :
streams.entrySet())
+ {
+ InetAddressAndPort peer = entry.getKey();
+ SingleTransferResult result = entry.getValue();
+ if (result.state == SingleTransferResult.State.ACTIVATE_COMPLETE)
+ {
+ anyActivated = true;
+ }
+ else if (result.state ==
SingleTransferResult.State.STREAM_COMPLETE)
+ awaitingActivation.add(peer);
+ }
+ if (anyActivated && !awaitingActivation.isEmpty())
+ {
+ logger.debug("{} Transfer already activated on peers, sending
activations to {}", logPrefix(), awaitingActivation);
+ return activateOn(awaitingActivation);
+ }
+
+ // If no activations have been sent out, check whether we have enough
planIds back to meet the required CL
+ else if (sufficient())
+ {
+ Set<InetAddressAndPort> peers = new HashSet<>();
+ for (Map.Entry<InetAddressAndPort, SingleTransferResult> entry :
streams.entrySet())
+ {
+ InetAddressAndPort peer = entry.getKey();
+ SingleTransferResult result = entry.getValue();
+ if (result.state == SingleTransferResult.State.STREAM_COMPLETE)
+ peers.add(peer);
+ }
+ logger.debug("{} Transfer meets consistency level {}, sending
activations to {}", logPrefix(), cl, peers);
+ return activateOn(peers);
+ }
+
+ logger.debug("Nothing to activate");
+ return ImmediateFuture.success(null);
+ }
+
+ synchronized Future<Void> activateOn(Collection<InetAddressAndPort> peers)
+ {
+ Preconditions.checkState(!peers.isEmpty());
+
+ if (activationId == null)
+ {
+ activationId = getActivationId.get();
+ logger.info("{} Assigned activationId {}", logPrefix(),
activationId);
+ }
+ LocalTransfers.instance().activating(this);
+
+ // First phase is dryRun to ensure data is present on disk, then
second phase does the actual import. This
+ // ensures that if something goes wrong (like a topology change during
import), we don't have divergence.
+ class DryRun extends AsyncFuture<Void> implements
RequestCallbackWithFailure<NoPayload>
+ {
+ final Set<InetAddressAndPort> responses =
ConcurrentHashMap.newKeySet();
+
+ public DryRun()
+ {
+ responses.addAll(peers);
+ }
+
+ @Override
+ public void onResponse(Message<NoPayload> msg)
+ {
+ logger.debug("{} Got response from: {}", logPrefix(),
msg.from());
+ responses.remove(msg.from());
+ if (responses.isEmpty())
+ trySuccess(null);
+ }
+
+ @Override
+ public void onFailure(InetAddressAndPort from, RequestFailure
failure)
+ {
+ logger.debug("{} Got failure {} from {}", logPrefix(),
failure, from);
+ tryFailure(null);
+ }
+ }
+
+ DryRun dryRun = new DryRun();
+ for (InetAddressAndPort peer : peers)
+ {
+ TransferActivation activation = new TransferActivation(this, peer,
true);
+ Message<TransferActivation> msg =
Message.out(Verb.TRACKED_TRANSFER_ACTIVATE_REQ, activation);
+ logger.debug("{} Sending {} to peer {}", logPrefix(), activation,
peer);
+ MessagingService.instance().sendWithCallback(msg, peer, dryRun);
+ SingleTransferResult result =
CoordinatedTransfer.this.streams.get(msg.from());
+ if (result != null)
+ result.sentActivation();
+ }
+ dryRun.awaitUninterruptibly();
+ logger.debug("{} Dry run complete for {}", logPrefix(), peers);
+
+ // Acknowledgement of activation is equivalent to a remote write
acknowledgement. The imported SSTables
+ // are now part of the live set, visible to reads.
+ class EachActivate extends AsyncFuture<Void> implements
RequestCallbackWithFailure<Void>
+ {
+ final Set<InetAddressAndPort> responses =
ConcurrentHashMap.newKeySet();
+
+ private EachActivate(Collection<InetAddressAndPort> peers)
+ {
+ responses.addAll(peers);
+ }
+
+ @Override
+ public void onResponse(Message<Void> msg)
+ {
+ logger.debug("Activation successfully applied on {}",
msg.from());
+ SingleTransferResult result =
CoordinatedTransfer.this.streams.get(msg.from());
+ if (result != null)
+ result.completedActivation();
+
+
MutationTrackingService.instance.receivedActivationAck(CoordinatedTransfer.this,
msg.from());
+ responses.remove(msg.from());
+ if (responses.isEmpty())
+ {
+ // All activations complete, schedule cleanup to purge
pending SSTables
+ LocalTransfers.instance().scheduleCleanup();
+ trySuccess(null);
+ }
+ }
+
+ @Override
+ public void onFailure(InetAddressAndPort from, RequestFailure
failure)
+ {
+ logger.error("Failed activation on {} due to {}", from,
failure);
+ // TODO(expected): should only fail if we don't meet requested
CL, individual failures are fine
+ responses.remove(from);
+ if (responses.isEmpty())
+ trySuccess(null);
Review Comment:
Can we actually tolerate failures here? I could see this causing correctness
problems if all requests fail because no replica received the promotion message
and the caller believes that the transfer was successful. Or if a single node
responded with a success response and then immediately went down. In both cases
the caller believes the transfer succeeded, but it's actually unable to be
completed.
##########
src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java:
##########
@@ -376,6 +376,8 @@ protected void recordLatency(TableMetrics metric, long
latencyNanos)
public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore
cfs, ReadExecutionController controller)
{
ColumnFamilyStore.ViewFragment view =
cfs.select(View.selectLive(dataRange().keyRange()));
+ if (cfs.metadata().replicationType().isTracked())
+ controller.addActivationIds(view);
Review Comment:
discussed elsewhere, but we should only be adding activation ids that
intersect with the key/range of the read
##########
src/java/org/apache/cassandra/replication/UnreconciledTransfers.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.TableId;
+
+/**
+ * See {@link UnreconciledMutations}.
+ *
+ * For now, all reads intersect with all transfers, but we could be more
discerning and only return transfers ƒor the
+ * specific table and range. Transfers should be very rare.
+ */
+class UnreconciledTransfers
Review Comment:
This could use some unit tests confirming it's working as expected
##########
src/java/org/apache/cassandra/db/SSTableImporter.java:
##########
@@ -183,7 +181,14 @@ synchronized List<String> importNewSSTables(Options
options)
Descriptor newDescriptor =
cfs.getUniqueDescriptorFor(entry.getKey(), targetDir);
maybeMutateMetadata(entry.getKey(), options);
movedSSTables.add(new MovedSSTable(newDescriptor,
entry.getKey(), entry.getValue()));
- SSTableReader sstable =
SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor,
entry.getValue(), options.copyData);
+ SSTableReader sstable;
+ if (isTracked)
+ sstable = SSTableReader.open(cfs, oldDescriptor,
metadata.ref);
Review Comment:
If we stream even to the local node, we need to cleanup the source sstables.
I think these are left on disk. We should also have a comment here explaining
why we're not moving the sstable.
I'd consider separating these 2 flows into separate methods or separate
AbstractSSTableImport implementations, or something to that effect. The tracked
and untracked flows have some fairly significant differences. I don't have
super strong opinions about that, but the multiple uses of the isTracked flag
throughout this fairly long method seems a bit brittle and makes it harder to
read (imo)
##########
src/java/org/apache/cassandra/replication/CoordinatedTransfer.java:
##########
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RequestFailure;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.RequestCallbackWithFailure;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.concurrent.AsyncFuture;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
+import org.apache.cassandra.utils.concurrent.ImmediateFuture;
+
+/**
+ * A tracked bulk transfer for a single replica set.
+ * <p>
+ * For simplicity, streaming from coordinator to itself instead of copying
files. This has some perks:
+ * (1) it allows us to import out-of-range SSTables using the same paths, and
+ * (2) it uses the existing lifecycle management to handle crash-safety, so
don't need to deal with atomic multi-file
+ * copy.
+ * <p>
+ * A transfer happens in a few steps. First, the coordinator streams the
SSTables to each replica. Replicas store the
+ * streamed transfer in a "pending" location on the filesystem, where it isn't
visible to reads. Once the coordinator
+ * receives acknowledgements of completed streams from sufficient replicas,
the coordinator assigns an "activation ID"
+ * for the transfer, and notifies replicas that the pending stream has been
activated with that ID. Replicas then move
+ * the pending SSTables into the live set, where they're visible to reads, and
include the "activation ID" in mutation
+ * tracking summaries for reads that would include the new SSTables.
+ */
+public class CoordinatedTransfer
+{
+ private static final Logger logger =
LoggerFactory.getLogger(CoordinatedTransfer.class);
+
+ String logPrefix()
+ {
+ return String.format("[CoordinatedTransfer #%s]", transferId);
+ }
+
+ final TimeUUID transferId = TimeUUID.Generator.nextTimeUUID();
+
+ // TODO(expected): Add epoch at time of creation
+ final String keyspace;
+ public final Range<Token> range;
+
+ final ConcurrentMap<InetAddressAndPort, SingleTransferResult> streams;
+
+ final Collection<SSTableReader> sstables;
+
+ final ConsistencyLevel cl;
+
+ final Supplier<MutationId> getActivationId;
+ volatile MutationId activationId = null;
+
+ CoordinatedTransfer(String keyspace, Range<Token> range, Participants
participants, Collection<SSTableReader> sstables, ConsistencyLevel cl,
Supplier<MutationId> getActivationId)
+ {
+ this.keyspace = keyspace;
+ this.range = range;
+ this.sstables = sstables;
+ this.cl = cl;
+ this.getActivationId = getActivationId;
+
+ ClusterMetadata cm = ClusterMetadata.current();
+ this.streams = new ConcurrentHashMap<>(participants.size());
+ for (int i = 0; i < participants.size(); i++)
+ {
+ InetAddressAndPort addr = cm.directory.getNodeAddresses(new
NodeId(participants.get(i))).broadcastAddress;
+ this.streams.put(addr, SingleTransferResult.Unknown());
+ }
+ }
+
+ void execute()
+ {
+ logger.debug("Executing tracked bulk transfer {}", this);
+ LocalTransfers.instance().save(this);
+ stream();
+ }
+
+ private void stream()
+ {
+ // TODO: Don't stream multiple copies over the WAN, send one copy and
indicate forwarding
+ List<Future<Void>> streaming = new ArrayList<>(streams.size());
+ for (InetAddressAndPort to : streams.keySet())
+ streaming.add(stream(to));
+
+ Future<List<Void>> future = FutureCombiner.allOf(streaming);
+ future.awaitUninterruptibly();
+ future.rethrowIfFailed();
+ }
+
+ private boolean sufficient()
+ {
+ AbstractReplicationStrategy ars =
Keyspace.open(keyspace).getReplicationStrategy();
+ int blockFor = cl.blockFor(ars);
+ int responses = 0;
+ for (Map.Entry<InetAddressAndPort, SingleTransferResult> entry :
streams.entrySet())
+ {
+ if (entry.getValue().state ==
SingleTransferResult.State.STREAM_COMPLETE)
+ responses++;
+ }
+ return responses >= blockFor;
+ }
+
+ Future<Void> stream(InetAddressAndPort to)
+ {
+ return streamTask(to).andThenAsync(result -> streamComplete(to,
result));
+ }
+
+ private Future<Void> streamComplete(InetAddressAndPort to,
SingleTransferResult result)
+ {
+ streams.put(to, result);
+ logger.info("{} Completed streaming to {}, {}", logPrefix(), to, this);
+ return maybeActivate();
+ }
+
+ synchronized Future<Void> maybeActivate()
+ {
+ // If any activations have already been sent out, send new activations
to any received plans that have not yet
+ // been activated
+ boolean anyActivated = false;
+ Set<InetAddressAndPort> awaitingActivation = new HashSet<>();
+ for (Map.Entry<InetAddressAndPort, SingleTransferResult> entry :
streams.entrySet())
+ {
+ InetAddressAndPort peer = entry.getKey();
+ SingleTransferResult result = entry.getValue();
+ if (result.state == SingleTransferResult.State.ACTIVATE_COMPLETE)
+ {
+ anyActivated = true;
+ }
+ else if (result.state ==
SingleTransferResult.State.STREAM_COMPLETE)
+ awaitingActivation.add(peer);
+ }
+ if (anyActivated && !awaitingActivation.isEmpty())
+ {
+ logger.debug("{} Transfer already activated on peers, sending
activations to {}", logPrefix(), awaitingActivation);
+ return activateOn(awaitingActivation);
+ }
+
+ // If no activations have been sent out, check whether we have enough
planIds back to meet the required CL
+ else if (sufficient())
+ {
+ Set<InetAddressAndPort> peers = new HashSet<>();
+ for (Map.Entry<InetAddressAndPort, SingleTransferResult> entry :
streams.entrySet())
+ {
+ InetAddressAndPort peer = entry.getKey();
+ SingleTransferResult result = entry.getValue();
+ if (result.state == SingleTransferResult.State.STREAM_COMPLETE)
+ peers.add(peer);
+ }
+ logger.debug("{} Transfer meets consistency level {}, sending
activations to {}", logPrefix(), cl, peers);
+ return activateOn(peers);
+ }
+
+ logger.debug("Nothing to activate");
+ return ImmediateFuture.success(null);
+ }
+
+ synchronized Future<Void> activateOn(Collection<InetAddressAndPort> peers)
+ {
+ Preconditions.checkState(!peers.isEmpty());
+
+ if (activationId == null)
+ {
+ activationId = getActivationId.get();
+ logger.info("{} Assigned activationId {}", logPrefix(),
activationId);
+ }
+ LocalTransfers.instance().activating(this);
+
+ // First phase is dryRun to ensure data is present on disk, then
second phase does the actual import. This
+ // ensures that if something goes wrong (like a topology change during
import), we don't have divergence.
+ class DryRun extends AsyncFuture<Void> implements
RequestCallbackWithFailure<NoPayload>
+ {
+ final Set<InetAddressAndPort> responses =
ConcurrentHashMap.newKeySet();
+
+ public DryRun()
+ {
+ responses.addAll(peers);
+ }
+
+ @Override
+ public void onResponse(Message<NoPayload> msg)
+ {
+ logger.debug("{} Got response from: {}", logPrefix(),
msg.from());
+ responses.remove(msg.from());
+ if (responses.isEmpty())
+ trySuccess(null);
+ }
+
+ @Override
+ public void onFailure(InetAddressAndPort from, RequestFailure
failure)
+ {
+ logger.debug("{} Got failure {} from {}", logPrefix(),
failure, from);
+ tryFailure(null);
+ }
+ }
+
+ DryRun dryRun = new DryRun();
+ for (InetAddressAndPort peer : peers)
+ {
+ TransferActivation activation = new TransferActivation(this, peer,
true);
+ Message<TransferActivation> msg =
Message.out(Verb.TRACKED_TRANSFER_ACTIVATE_REQ, activation);
+ logger.debug("{} Sending {} to peer {}", logPrefix(), activation,
peer);
+ MessagingService.instance().sendWithCallback(msg, peer, dryRun);
+ SingleTransferResult result =
CoordinatedTransfer.this.streams.get(msg.from());
+ if (result != null)
+ result.sentActivation();
+ }
+ dryRun.awaitUninterruptibly();
+ logger.debug("{} Dry run complete for {}", logPrefix(), peers);
+
+ // Acknowledgement of activation is equivalent to a remote write
acknowledgement. The imported SSTables
+ // are now part of the live set, visible to reads.
+ class EachActivate extends AsyncFuture<Void> implements
RequestCallbackWithFailure<Void>
+ {
+ final Set<InetAddressAndPort> responses =
ConcurrentHashMap.newKeySet();
+
+ private EachActivate(Collection<InetAddressAndPort> peers)
+ {
+ responses.addAll(peers);
+ }
+
+ @Override
+ public void onResponse(Message<Void> msg)
+ {
+ logger.debug("Activation successfully applied on {}",
msg.from());
+ SingleTransferResult result =
CoordinatedTransfer.this.streams.get(msg.from());
+ if (result != null)
+ result.completedActivation();
+
+
MutationTrackingService.instance.receivedActivationAck(CoordinatedTransfer.this,
msg.from());
+ responses.remove(msg.from());
+ if (responses.isEmpty())
+ {
+ // All activations complete, schedule cleanup to purge
pending SSTables
+ LocalTransfers.instance().scheduleCleanup();
Review Comment:
is it safe to cleanup if some of the responses were failures? If so we need
this in the `onFailure` method as well. Or as a callback on the future
##########
src/java/org/apache/cassandra/replication/MutationJournal.java:
##########
@@ -204,6 +204,15 @@ boolean read(ShortMutationId id,
RecordConsumer<ShortMutationId> consumer)
return journal.readLast(id, consumer);
}
+ public boolean readIfExists(ShortMutationId id, Collection<Mutation> into)
Review Comment:
unused
##########
src/java/org/apache/cassandra/db/ReadExecutionController.java:
##########
@@ -243,4 +261,20 @@ private void addSample()
if (cfs != null)
cfs.metric.topLocalReadQueryTime.addSample(cql, timeMicros);
}
+
+ public void addActivationIds(ColumnFamilyStore.ViewFragment view)
+ {
+ activationIds = new HashSet<>();
Review Comment:
possibly paranoid, but can we check this isn't null before creating a new
hashset?
##########
src/java/org/apache/cassandra/replication/ActiveLogReconciler.java:
##########
@@ -114,12 +120,26 @@ public void run(Interruptible.State state) throws
InterruptedException
}
}
- private static final class Task implements RequestCallback<NoPayload>
+ private static abstract class Task implements RequestCallback<NoPayload>
+ {
+ private static Task from(ShortMutationId id, InetAddressAndPort toHost)
Review Comment:
I think this should go in MutationTask right?
##########
src/java/org/apache/cassandra/replication/ImmutableCoordinatorLogOffsets.java:
##########
@@ -75,23 +85,25 @@ public boolean equals(Object o)
{
if (o == null || getClass() != o.getClass()) return false;
ImmutableCoordinatorLogOffsets longs =
(ImmutableCoordinatorLogOffsets) o;
- return Objects.equals(ids, longs.ids);
+ return Objects.equals(ids, longs.ids) && Objects.equals(transfers,
longs.transfers);
}
@Override
public int hashCode()
{
- return Objects.hashCode(ids);
+ return Objects.hash(ids, transfers);
}
- public ImmutableCoordinatorLogOffsets(Builder builder)
+ private ImmutableCoordinatorLogOffsets(Builder builder)
{
// Important to set shouldAvoidAllocation=false, otherwise iterators
are cached and not thread safe, even when
// immutable and read-only
this.ids = new Long2ObjectHashMap<>(builder.ids.size(), 0.9f, false);
for (Map.Entry<Long, Offsets.Immutable.Builder> entry :
builder.ids.entrySet())
ids.put(entry.getKey(), entry.getValue().build());
+
+ this.transfers = builder.transfers;
Review Comment:
nit - it would be better to make a defensive copy like we do of ids, or make
builder.transfers an immutable list builder
##########
src/java/org/apache/cassandra/replication/TransferActivation.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.TimeUUID;
+
+public class TransferActivation
+{
+ public final TimeUUID transferId;
+ public final TimeUUID planId;
+ public final MutationId activationId;
+ public final NodeId coordinatorId;
+ public final boolean dryRun;
+
+ public TransferActivation(CoordinatedTransfer transfer, InetAddressAndPort
peer, boolean dryRun)
+ {
+ this(transfer.transferId, transfer.streams.get(peer).planId(),
transfer.activationId, ClusterMetadata.current().myNodeId(), dryRun);
+ }
+
+ TransferActivation(TimeUUID transferId, TimeUUID planId, MutationId
activationId, NodeId coordinatorId, boolean dryRun)
+ {
+ this.transferId = transferId;
+ Preconditions.checkArgument(!activationId.isNone());
+ Preconditions.checkNotNull(planId);
+ Preconditions.checkNotNull(coordinatorId);
+ this.planId = planId;
+ this.activationId = activationId;
+ this.coordinatorId = coordinatorId;
+ this.dryRun = dryRun;
+ }
+
+ public void apply()
+ {
+ MutationTrackingService.instance.activateLocal(this);
+ }
+
+ public static final Serializer serializer = new Serializer();
+
+ public static class Serializer implements
IVersionedSerializer<TransferActivation>
+ {
+ @Override
+ public void serialize(TransferActivation activate, DataOutputPlus out,
int version) throws IOException
+ {
+ TimeUUID.Serializer.instance.serialize(activate.transferId, out,
version);
+ TimeUUID.Serializer.instance.serialize(activate.planId, out,
version);
+ MutationId.serializer.serialize(activate.activationId, out,
version);
+ NodeId.messagingSerializer.serialize(activate.coordinatorId, out,
version);
+ out.writeBoolean(activate.dryRun);
+ }
+
+ @Override
+ public TransferActivation deserialize(DataInputPlus in, int version)
throws IOException
+ {
+ TimeUUID transferId = TimeUUID.Serializer.instance.deserialize(in,
version);
+ TimeUUID planId = TimeUUID.Serializer.instance.deserialize(in,
version);
+ MutationId activationId = MutationId.serializer.deserialize(in,
version);
+ NodeId coordinatorId = NodeId.messagingSerializer.deserialize(in,
version);
+ boolean dryRun = in.readBoolean();
+ return new TransferActivation(transferId, planId, activationId,
coordinatorId, dryRun);
+ }
+
+ @Override
+ public long serializedSize(TransferActivation activate, int version)
+ {
+ long size = 0;
+ size +=
TimeUUID.Serializer.instance.serializedSize(activate.transferId, version);
+ size +=
TimeUUID.Serializer.instance.serializedSize(activate.planId, version);
+ size +=
MutationId.serializer.serializedSize(activate.activationId, version);
+ size +=
NodeId.messagingSerializer.serializedSize(activate.coordinatorId, version);
+ size += TypeSizes.BOOL_SIZE;
+ return size;
+ }
+ }
+
+ public static class VerbHandler implements IVerbHandler<TransferActivation>
+ {
+ @Override
+ public void doVerb(Message<TransferActivation> msg) throws IOException
+ {
+ LocalTransfers.instance().executor.submit(() -> {
+ msg.payload.apply();
+ MessagingService.instance().respond(NoPayload.noPayload, msg);
+ }).rethrowIfFailed();
Review Comment:
This could be streamlined a bit. From the anti-entropy stage this is
submitting a task that submits a task to another executor, which does something
and responds, but blocks in the anti-entropy stage waiting for it to complete.
If it fails it will (I think) respond with a failure response. Looking at what
apply does - I think it's something we could just do in the anti-entropy
thread. However if you're worried about this being a long running task, then
it's fine to hand it off to the local transfers executor, we should just free
up the anti-entropy thread and send failure responses directly from that thread.
##########
src/java/org/apache/cassandra/replication/ImmutableCoordinatorLogOffsets.java:
##########
@@ -112,6 +125,9 @@ public Builder()
public Builder(int size)
{
this.ids = new Long2ObjectHashMap<>(size, 0.9f, false);
+
+ // Transfers are very rare, opt to save memory
+ this.transfers = new ArrayList<>(1);
Review Comment:
_technically_ using null here and only instantiating an object if a transfer
is added would save the most memory
##########
src/java/org/apache/cassandra/replication/LocalTransfers.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.utils.TimeUUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+
+/**
+ * Stores coordinated and received transfers.
+ *
+ * TODO: Make changes to pending set durable with
SystemKeyspace.savePendingLocalTransfer(transfer)?
+ * TODO: GC
+ */
+class LocalTransfers
+{
+ private static final Logger logger =
LoggerFactory.getLogger(LocalTransfers.class);
+
+ private final Map<TimeUUID, CoordinatedTransfer> coordinating = new
HashMap<>();
+ private final Map<ShortMutationId, CoordinatedTransfer>
coordinatingActivated = new HashMap<>();
+ private final Map<TimeUUID, PendingLocalTransfer> received = new
HashMap<>();
+
+ final ExecutorPlus executor =
executorFactory().pooled("LocalTrackedTransfers", Integer.MAX_VALUE);
+
+ private static final LocalTransfers instance = new LocalTransfers();
+ static LocalTransfers instance()
+ {
+ return instance;
+ }
+
+ synchronized void save(CoordinatedTransfer transfer)
+ {
+ CoordinatedTransfer existing = coordinating.put(transfer.transferId,
transfer);
+ Preconditions.checkState(existing == null);
+ }
+
+ synchronized void activating(CoordinatedTransfer transfer)
+ {
+ coordinatingActivated.put(transfer.activationId, transfer);
+ }
+
+ synchronized void received(PendingLocalTransfer transfer)
+ {
+ logger.debug("received: {}", transfer);
+ Preconditions.checkState(!transfer.sstables.isEmpty());
+
+ PendingLocalTransfer existing = received.put(transfer.planId,
transfer);
+ Preconditions.checkState(existing == null);
+ }
+
+ private void cleanup()
+ {
+ Set<Map.Entry<TimeUUID, PendingLocalTransfer>> candidates;
+ synchronized (this)
+ {
+ candidates = received.entrySet();
+ }
+ for (Map.Entry<TimeUUID, PendingLocalTransfer> candidate : candidates)
+ {
+ PendingLocalTransfer transfer = candidate.getValue();
+ if (!transfer.activated)
+ return;
Review Comment:
this should be `break`
##########
src/java/org/apache/cassandra/replication/MutationTrackingService.java:
##########
@@ -88,9 +95,12 @@ public class MutationTrackingService
/**
* Split ranges into this many shards.
* <p>
+ * REVIEW: Reset back to 1 because for transfers, replicas need to know
each others' shards, since transfers are
+ * sliced to fit within shards. Can we achieve sharding via split range
ownership, instead of it being local-only?
+ * <p>
Review Comment:
This was added as an aid to perf testing. I think reverting it to one is
fine, especially since there's a functional reason here, but we should open a
jira to revisit it
##########
src/java/org/apache/cassandra/replication/MutationTrackingService.java:
##########
@@ -250,6 +260,16 @@ public void receivedWriteResponse(ShortMutationId
mutationId, InetAddressAndPort
}
}
+ public void receivedActivationAck(CoordinatedTransfer transfer,
InetAddressAndPort fromHost)
+ {
+ logger.debug("{} receivedActivationAck from {}", transfer.logPrefix(),
fromHost);
+ MutationId activationId = transfer.activationId;
+ Preconditions.checkArgument(!activationId.isNone());
+ Shard shard = getShardNullable(activationId);
Review Comment:
the host replacement code added a shard lock that we need to hold a read
lock for while doing stuff like this. This applies to the other changes in this
file as well
##########
src/java/org/apache/cassandra/replication/PendingLocalTransfer.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.streaming.CassandraStreamReceiver;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.utils.TimeUUID;
+
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+
+/**
+ * A transfer on a replica, once present on disk.
+ */
+public class PendingLocalTransfer
+{
+ private static final Logger logger =
LoggerFactory.getLogger(PendingLocalTransfer.class);
+
+ private String logPrefix()
+ {
+ return String.format("[PendingLocalTransfer #%s]", planId);
+ }
+
+ final TimeUUID planId;
+ final TableId tableId;
+ final Collection<SSTableReader> sstables;
+ final long createdAt = currentTimeMillis();
+ transient String keyspace;
+ transient Range<Token> range;
+
+ volatile boolean activated = false;
+
+ public PendingLocalTransfer(TableId tableId, TimeUUID planId,
Collection<SSTableReader> sstables)
+ {
+ Preconditions.checkState(!sstables.isEmpty());
+ this.tableId = tableId;
+ this.planId = planId;
+ this.sstables = sstables;
+ this.keyspace =
Objects.requireNonNull(ColumnFamilyStore.getIfExists(tableId)).keyspace.getName();
+ this.range = shardRange(keyspace, sstables);
+ }
+
+ /**
+ * Pending transfers should be within a single shard, which are aligned to
natural ranges.
+ * See ({@link MutationTrackingService.KeyspaceShards#make}).
+ */
+ private static Range<Token> shardRange(String keyspace,
Collection<SSTableReader> sstables)
+ {
+ ClusterMetadata cm = ClusterMetadata.current();
+ ReplicaGroups writes =
cm.placements.get(Keyspace.open(keyspace).getMetadata().params.replication).writes;
+ Range<Token> range = null;
+ for (SSTableReader sstable : sstables)
+ {
+ if (range == null)
+ {
+ Token first = sstable.getFirst().getToken();
+ range = writes.forRange(first).range();
+ }
+ else
+ {
+ AbstractBounds<Token> bounds = sstable.getBounds();
+ Preconditions.checkState(!range.isTrulyWrapAround());
+ Preconditions.checkState(range.contains(bounds.left));
+ Preconditions.checkState(range.contains(bounds.right));
+ }
+ }
+
+ Preconditions.checkNotNull(range);
+ return range;
+ }
+
+ private boolean isFullReplica()
+ {
+ ClusterMetadata cm = ClusterMetadata.current();
+ Keyspace ks = Keyspace.open(keyspace);
+ ReplicaGroups writes =
cm.placements.get(ks.getMetadata().params.replication).writes;
+ EndpointsForRange replicas = writes.forRange(range.right).get();
+ return replicas.containsSelf() && replicas.selfIfPresent().isFull();
+ }
+
+ /**
+ * Safely move a transfer into the live set. This must be crash-safe, and
the primary invariant we need to
+ * preserve is a transfer is only added to the live set iff the transfer
ID is present in its mutation summaries.
+ *
+ * We don't validate checksums here, mostly because a transfer can be
activated during a read, if one replica
+ * missed the TransferActivation. Transfers should not be pending for very
long, and should be protected by
+ * internode integrity checks provided by TLS.
+ *
+ * TODO: Clear out the row cache and counter cache, like {@link
CassandraStreamReceiver#finished}.
+ * TODO: Don't add to the live set if coordinator and not an owner for the
range
+ */
+ public void activate(TransferActivation activation)
+ {
+ if (activated)
+ return;
+
+ Preconditions.checkState(isFullReplica());
+
+ logger.info("{} Activating transfer {}, {} ms since pending",
logPrefix(), this, currentTimeMillis() - createdAt);
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+ Preconditions.checkNotNull(cfs);
+ Preconditions.checkState(!sstables.isEmpty());
+
+ if (activation.dryRun)
+ {
+ logger.info("{} Not adding SSTables to live set for dryRun {}",
logPrefix(), activation);
+ return;
+ }
+
+ // Ensure no lingering mutation IDs, only activation IDs
+ for (SSTableReader sstable : sstables)
+ {
+
Preconditions.checkState(sstable.getCoordinatorLogOffsets().isEmpty());
+
+ // Modify SSTables metadata to durably set transfer ID before
importing
+ ImmutableCoordinatorLogOffsets logOffsets = new
ImmutableCoordinatorLogOffsets.Builder()
+
.addTransfer(activation.activationId)
+ .build();
+ try
+ {
+ sstable.mutateCoordinatorLogOffsetsAndReload(logOffsets);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+
Preconditions.checkState(sstable.getCoordinatorLogOffsets().isEmpty());
+
Preconditions.checkState(!sstable.getCoordinatorLogOffsets().transfers().isEmpty());
+ }
+
+ File dst = cfs.getDirectories().getDirectoryForNewSSTables();
+
+ // Retain the original SSTables in pending/ dir on the coordinator, so
future streams can get the originals and
+ // we don't need to isolate activated SSTables during compaction
+ boolean isCoordinator = activation.activationId.hostId ==
ClusterMetadata.current().myNodeId().id();
+ logger.debug("{} {} pending SSTables for activation to {}",
isCoordinator ? "Copying" : "Moving", logPrefix(), dst);
+
+ dst.createFileIfNotExists();
+ for (SSTableReader sstable : sstables)
+ {
+ SSTableReader moved = SSTableReader.moveAndOpenSSTable(cfs,
sstable.descriptor, cfs.getUniqueDescriptorFor(sstable.descriptor, dst),
sstable.getComponents(), isCoordinator);
+ cfs.getTracker().addSSTablesTracked(Collections.singleton(moved));
+ }
+
+ activated = true;
Review Comment:
obviously we don't have durability implemented for this yet, but we should
definitely have tests validating the restart safety of partially completed
activations when we do
##########
src/java/org/apache/cassandra/replication/MutationTrackingService.java:
##########
@@ -325,6 +345,38 @@ public boolean registerMutationCallback(ShortMutationId
mutationId, IncomingMuta
return incomingMutations.subscribe(mutationId, callback);
}
+ public void executeTransfers(String keyspace, Set<SSTableReader> sstables,
ConsistencyLevel cl)
+ {
+ logger.info("Creating tracked bulk transfers for keyspace {} sstables
{}", keyspace, sstables);
+
+ KeyspaceShards shards = keyspaceShards.get(keyspace);
+ checkNotNull(shards);
+
+ CoordinatedTransfers transfers = CoordinatedTransfers.create(shards,
sstables, cl);
+ logger.info("Split input SSTables into transfers {}", transfers);
+
+ for (CoordinatedTransfer transfer : transfers)
+ transfer.execute();
+ }
+
+ public void received(PendingLocalTransfer transfer)
+ {
+ logger.debug("Received pending transfer for tracked table {}",
transfer);
+ LocalTransfers.instance().received(transfer);
+ }
+
+ synchronized void activateLocal(TransferActivation activation)
Review Comment:
should this be synchronized? I don't think we need to guard against
concurrent activation of different activations do we?
##########
src/java/org/apache/cassandra/replication/LocalTransfers.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.utils.TimeUUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+
+/**
+ * Stores coordinated and received transfers.
+ *
+ * TODO: Make changes to pending set durable with
SystemKeyspace.savePendingLocalTransfer(transfer)?
+ * TODO: GC
+ */
+class LocalTransfers
+{
+ private static final Logger logger =
LoggerFactory.getLogger(LocalTransfers.class);
+
+ private final Map<TimeUUID, CoordinatedTransfer> coordinating = new
HashMap<>();
+ private final Map<ShortMutationId, CoordinatedTransfer>
coordinatingActivated = new HashMap<>();
+ private final Map<TimeUUID, PendingLocalTransfer> received = new
HashMap<>();
+
+ final ExecutorPlus executor =
executorFactory().pooled("LocalTrackedTransfers", Integer.MAX_VALUE);
+
+ private static final LocalTransfers instance = new LocalTransfers();
+ static LocalTransfers instance()
+ {
+ return instance;
+ }
+
+ synchronized void save(CoordinatedTransfer transfer)
+ {
+ CoordinatedTransfer existing = coordinating.put(transfer.transferId,
transfer);
+ Preconditions.checkState(existing == null);
+ }
+
+ synchronized void activating(CoordinatedTransfer transfer)
+ {
+ coordinatingActivated.put(transfer.activationId, transfer);
+ }
+
+ synchronized void received(PendingLocalTransfer transfer)
+ {
+ logger.debug("received: {}", transfer);
+ Preconditions.checkState(!transfer.sstables.isEmpty());
+
+ PendingLocalTransfer existing = received.put(transfer.planId,
transfer);
+ Preconditions.checkState(existing == null);
+ }
+
+ private void cleanup()
+ {
+ Set<Map.Entry<TimeUUID, PendingLocalTransfer>> candidates;
+ synchronized (this)
+ {
+ candidates = received.entrySet();
Review Comment:
this isn't actually threadsafe. The set returned is just a view of the keys
in HashMap, not a copy, so changes to the map will corrupt this set. Using a
concurrent hash map is fine for this use case, even if we are synchronizing
many of the udpates to it
##########
src/java/org/apache/cassandra/replication/PendingLocalTransfer.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.streaming.CassandraStreamReceiver;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.utils.TimeUUID;
+
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+
+/**
+ * A transfer on a replica, once present on disk.
+ */
+public class PendingLocalTransfer
+{
+ private static final Logger logger =
LoggerFactory.getLogger(PendingLocalTransfer.class);
+
+ private String logPrefix()
+ {
+ return String.format("[PendingLocalTransfer #%s]", planId);
+ }
+
+ final TimeUUID planId;
+ final TableId tableId;
+ final Collection<SSTableReader> sstables;
+ final long createdAt = currentTimeMillis();
+ transient String keyspace;
+ transient Range<Token> range;
+
+ volatile boolean activated = false;
+
+ public PendingLocalTransfer(TableId tableId, TimeUUID planId,
Collection<SSTableReader> sstables)
+ {
+ Preconditions.checkState(!sstables.isEmpty());
+ this.tableId = tableId;
+ this.planId = planId;
+ this.sstables = sstables;
+ this.keyspace =
Objects.requireNonNull(ColumnFamilyStore.getIfExists(tableId)).keyspace.getName();
+ this.range = shardRange(keyspace, sstables);
+ }
+
+ /**
+ * Pending transfers should be within a single shard, which are aligned to
natural ranges.
+ * See ({@link MutationTrackingService.KeyspaceShards#make}).
+ */
+ private static Range<Token> shardRange(String keyspace,
Collection<SSTableReader> sstables)
+ {
+ ClusterMetadata cm = ClusterMetadata.current();
+ ReplicaGroups writes =
cm.placements.get(Keyspace.open(keyspace).getMetadata().params.replication).writes;
+ Range<Token> range = null;
+ for (SSTableReader sstable : sstables)
+ {
+ if (range == null)
+ {
+ Token first = sstable.getFirst().getToken();
+ range = writes.forRange(first).range();
+ }
+ else
+ {
+ AbstractBounds<Token> bounds = sstable.getBounds();
+ Preconditions.checkState(!range.isTrulyWrapAround());
+ Preconditions.checkState(range.contains(bounds.left));
+ Preconditions.checkState(range.contains(bounds.right));
+ }
+ }
+
+ Preconditions.checkNotNull(range);
+ return range;
+ }
+
+ private boolean isFullReplica()
+ {
+ ClusterMetadata cm = ClusterMetadata.current();
+ Keyspace ks = Keyspace.open(keyspace);
+ ReplicaGroups writes =
cm.placements.get(ks.getMetadata().params.replication).writes;
+ EndpointsForRange replicas = writes.forRange(range.right).get();
+ return replicas.containsSelf() && replicas.selfIfPresent().isFull();
+ }
+
+ /**
+ * Safely move a transfer into the live set. This must be crash-safe, and
the primary invariant we need to
+ * preserve is a transfer is only added to the live set iff the transfer
ID is present in its mutation summaries.
+ *
+ * We don't validate checksums here, mostly because a transfer can be
activated during a read, if one replica
+ * missed the TransferActivation. Transfers should not be pending for very
long, and should be protected by
+ * internode integrity checks provided by TLS.
+ *
+ * TODO: Clear out the row cache and counter cache, like {@link
CassandraStreamReceiver#finished}.
+ * TODO: Don't add to the live set if coordinator and not an owner for the
range
+ */
+ public void activate(TransferActivation activation)
+ {
+ if (activated)
+ return;
+
+ Preconditions.checkState(isFullReplica());
+
+ logger.info("{} Activating transfer {}, {} ms since pending",
logPrefix(), this, currentTimeMillis() - createdAt);
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+ Preconditions.checkNotNull(cfs);
+ Preconditions.checkState(!sstables.isEmpty());
+
+ if (activation.dryRun)
+ {
+ logger.info("{} Not adding SSTables to live set for dryRun {}",
logPrefix(), activation);
+ return;
+ }
+
+ // Ensure no lingering mutation IDs, only activation IDs
+ for (SSTableReader sstable : sstables)
+ {
+
Preconditions.checkState(sstable.getCoordinatorLogOffsets().isEmpty());
+
+ // Modify SSTables metadata to durably set transfer ID before
importing
+ ImmutableCoordinatorLogOffsets logOffsets = new
ImmutableCoordinatorLogOffsets.Builder()
+
.addTransfer(activation.activationId)
+ .build();
+ try
+ {
+ sstable.mutateCoordinatorLogOffsetsAndReload(logOffsets);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+
Preconditions.checkState(sstable.getCoordinatorLogOffsets().isEmpty());
+
Preconditions.checkState(!sstable.getCoordinatorLogOffsets().transfers().isEmpty());
Review Comment:
this is a little weird from an api perspective. We're asserting that the log
offsets are empty, then asserting that a collection contained in them is not
empty. We should probably rename the ImmutableLogOffsets.isEmpty to something
indicating that we want to know if the offsets are empty, or if the transfers
are empty. There may also be other spots in the code where we take some action
if the log offsets are empty without confirming that the transfers are also
empty. So we should rethink some parts of this to make it less likely to be
misused
##########
src/java/org/apache/cassandra/db/compaction/CompactionTask.java:
##########
@@ -397,6 +397,7 @@ public static ImmutableCoordinatorLogOffsets
getCoordinatorLogOffsets(Set<SSTabl
ImmutableCoordinatorLogOffsets.Builder builder = new
ImmutableCoordinatorLogOffsets.Builder();
for (SSTableReader sstable : sstables)
builder.addAll(sstable.getCoordinatorLogOffsets());
+ builder.purgeTransfers();
Review Comment:
wouldn't this cause us to promote sstables tables containing transfers that
aren't fully reconciled?
##########
src/java/org/apache/cassandra/replication/LocalTransfers.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.utils.TimeUUID;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+
+/**
+ * Stores coordinated and received transfers.
+ *
+ * TODO: Make changes to pending set durable with
SystemKeyspace.savePendingLocalTransfer(transfer)?
+ * TODO: GC
+ */
+class LocalTransfers
+{
+ private static final Logger logger =
LoggerFactory.getLogger(LocalTransfers.class);
+
+ private final Map<TimeUUID, CoordinatedTransfer> coordinating = new
HashMap<>();
+ private final Map<ShortMutationId, CoordinatedTransfer>
coordinatingActivated = new HashMap<>();
+ private final Map<TimeUUID, PendingLocalTransfer> received = new
HashMap<>();
+
+ final ExecutorPlus executor =
executorFactory().pooled("LocalTrackedTransfers", Integer.MAX_VALUE);
+
+ private static final LocalTransfers instance = new LocalTransfers();
+ static LocalTransfers instance()
+ {
+ return instance;
+ }
+
+ synchronized void save(CoordinatedTransfer transfer)
+ {
+ CoordinatedTransfer existing = coordinating.put(transfer.transferId,
transfer);
+ Preconditions.checkState(existing == null);
+ }
+
+ synchronized void activating(CoordinatedTransfer transfer)
+ {
+ coordinatingActivated.put(transfer.activationId, transfer);
+ }
+
+ synchronized void received(PendingLocalTransfer transfer)
+ {
+ logger.debug("received: {}", transfer);
+ Preconditions.checkState(!transfer.sstables.isEmpty());
+
+ PendingLocalTransfer existing = received.put(transfer.planId,
transfer);
+ Preconditions.checkState(existing == null);
+ }
+
+ private void cleanup()
+ {
+ Set<Map.Entry<TimeUUID, PendingLocalTransfer>> candidates;
+ synchronized (this)
+ {
+ candidates = received.entrySet();
+ }
+ for (Map.Entry<TimeUUID, PendingLocalTransfer> candidate : candidates)
+ {
+ PendingLocalTransfer transfer = candidate.getValue();
+ if (!transfer.activated)
+ return;
+ purge(transfer);
+ }
+
+ // Clean up completed coordinated transfers
+ Set<Map.Entry<TimeUUID, CoordinatedTransfer>> coordinated;
+ synchronized (this)
+ {
+ coordinated = coordinating.entrySet();
+ }
+ for (Map.Entry<TimeUUID, CoordinatedTransfer> candidate : coordinated)
+ {
+ CoordinatedTransfer transfer = candidate.getValue();
+
+ // Check if all participants have completed activation or were
noops
+ boolean allComplete = true;
+ for (CoordinatedTransfer.SingleTransferResult result :
transfer.streams.values())
+ {
+ if (result.state !=
CoordinatedTransfer.SingleTransferResult.State.ACTIVATE_COMPLETE &&
+ result.state !=
CoordinatedTransfer.SingleTransferResult.State.STREAM_NOOP)
+ {
+ allComplete = false;
+ break;
+ }
+ }
+
+ if (allComplete && transfer.activationId != null)
+ purge(transfer);
+ }
+ }
+
+ private void purge(PendingLocalTransfer transfer)
+ {
+ logger.info("Cleaning up activated pending transfer: {}", transfer);
+
+ // Delete the entire pending transfer directory /pending/<planId>/
+ if (!transfer.sstables.isEmpty())
+ {
+ SSTableReader sstable = transfer.sstables.iterator().next();
+ File pendingDir = sstable.descriptor.directory;
+
+ if (pendingDir.exists())
+ {
+
Preconditions.checkState(pendingDir.absolutePath().contains(transfer.planId.toString()));
+ logger.debug("Deleting pending transfer directory: {}",
pendingDir);
+ pendingDir.deleteRecursive();
+ }
+ }
+ for (SSTableReader sstable : transfer.sstables)
+ sstable.selfRef().release();
Review Comment:
should we also be removing the transfer from the local transfers state here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]