frankgh commented on code in PR #321:
URL: https://github.com/apache/cassandra-sidecar/pull/321#discussion_r2898297905
##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java:
##########
@@ -124,6 +130,130 @@ public Future<OperationStatus> downloadFiles()
.otherwise(this::handleDownloadFailure);
}
+ /**
+ * Validates via cluster gossip that destination has not been started.
+ * This is a best-effort safety check performed before each file download
iteration.
+ * <p>
+ * Fetches gossip info from cluster instances and checks:
+ * <ul>
+ * <li>Source node is present in gossip</li>
+ * <li>Destination node is NOT present in gossip</li>
+ * </ul>
+ * <p>
+ * <strong>Note:</strong> This cannot guarantee destination won't start
during data copy.
+ * Operators must ensure destination is not started until migration
completes.
+ *
+ * @return Future that succeeds if validation passes, fails with
+ * {@link LiveMigrationInvalidRequestException} if unsafe
condition detected
+ */
+ private Future<Void> checkGossip()
+ {
+ // Skip gossip check if explicitly requested
+ if (request.skipGossipCheck != null && request.skipGossipCheck)
+ {
+ LOGGER.warn("{} Skipping gossip safety check as requested. " +
+ "This bypasses validation that destination has not
been started.", logPrefix);
+ return Future.succeededFuture();
+ }
+
+ String destination = instanceMetadata.host();
+ LOGGER.debug("{} Validating gossip state: source={}, destination={}",
logPrefix, source, destination);
+
+ // Apply configuration defaults for optional gossip fetch parameters
if not specified by client
+ int batchSize = request.gossipFetchBatchSize != null
+ ? request.gossipFetchBatchSize
+ : liveMigrationConfiguration.gossipFetchBatchSize();
+ int maxRetries = request.gossipFetchMaxRetries != null
+ ? request.gossipFetchMaxRetries
+ : liveMigrationConfiguration.gossipFetchMaxRetries();
+
+ GossipInfoFetcher gossipFetcher = new GossipInfoFetcher(
+ sidecarClient,
+ instancesMetadata,
+ port,
+ batchSize,
+ maxRetries);
+
+ return gossipFetcher.fetchGossipInfo()
+ .compose(gossipResponse -> {
+ try
+ {
+ return
validateGossipResponse(gossipResponse, source, destination);
+ }
+ catch (UnknownHostException e)
+ {
+ return Future.failedFuture(e);
+ }
+ })
+ .onSuccess(v -> LOGGER.debug("{} Gossip validation
passed", logPrefix))
+ .onFailure(err -> LOGGER.error("{} Gossip
validation failed: {}", logPrefix, err.getMessage()));
+ }
+
+ /**
+ * Validates the gossip response to ensure source is present in gossip and
destination is not present.
+ *
+ * @param gossipResponse the gossip information from a healthy node
+ * @param source the source instance to check for
+ * @param destination the destination instance to check for
+ * @return Future that succeeds if source is found and destination not
found,
+ * fails if source is not found or destination is found in gossip
+ */
+ private Future<Void> validateGossipResponse(GossipInfoResponse
gossipResponse,
+ String source,
+ String destination) throws
UnknownHostException
+ {
+ GossipInfoResponse.GossipInfo srcInfo = findGossipInfo(gossipResponse,
source);
+ if (srcInfo == null)
+ {
+ String errorMsg = "SAFETY CHECK FAILED: Source node '" + source +
"' not found in cluster gossip. " +
+ "Cannot proceed with data copy. Please fix the
source/cluster " +
+ "and then re-trigger data copy task if required";
+ LOGGER.error("{} {}", logPrefix, errorMsg);
+ return Future.failedFuture(new
LiveMigrationInvalidRequestException(errorMsg));
+ }
+
+ GossipInfoResponse.GossipInfo destInfo =
findGossipInfo(gossipResponse, destination);
+ if (destInfo != null)
+ {
+ // Destination found in gossip - it was started!
+ String status = destInfo.statusWithPort();
+ String errorMsg = String.format(
+ "SAFETY CHECK FAILED: Destination node '%s' found in cluster
gossip with status '%s'. " +
+ "This indicates Cassandra was previously started on the
destination. " +
+ "Data copy would overwrite potentially newer data, causing DATA
LOSS. Aborting.",
+ destination, status);
+
+ LOGGER.error("{} {}", logPrefix, errorMsg);
+ return Future.failedFuture(new
LiveMigrationInvalidRequestException(errorMsg));
+ }
+
+ // Reached here means, source found and destination not found in
gossip - safe to proceed
+ LOGGER.info("{} Gossip validation passed: destination {} not found in
cluster gossip",
+ logPrefix, destination);
+ return Future.succeededFuture();
+ }
+
+ private GossipInfoResponse.GossipInfo findGossipInfo(GossipInfoResponse
gossipResponse,
+ String instance)
throws UnknownHostException
+ {
+ InstanceMetadata metadata =
instancesMetadata.instanceFromHost(instance);
Review Comment:
is this `instance` managed by the local sidecar? `instancesMetadata` is only
available for instances managed by the local Sidecar process.
##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java:
##########
@@ -101,39 +107,85 @@ Future<LiveMigrationTask>
createDataCopyTask(LiveMigrationDataCopyRequest reques
String source,
InstanceMetadata
localInstanceMetadata)
{
- LiveMigrationTask newTask = createTask(request,
- source,
-
sidecarConfiguration.serviceConfiguration().port(),
- localInstanceMetadata);
-
- // It is possible to serve only one live migration data copy request
per instance at a time.
- // Checking if there is another migration is in progress before
accepting new one.
- boolean accepted = currentTasks.compute(localInstanceMetadata.id(),
(integer, taskInMap) -> {
- if (taskInMap == null)
+ // Fast local JMX check before creating task - prevents task creation
if Cassandra is running
+ return verifyCassandraNotRunning(localInstanceMetadata)
+ .compose(v -> {
+ LiveMigrationTask newTask = createTask(request,
+ source,
+
sidecarConfiguration.serviceConfiguration().port(),
+
localInstanceMetadata);
+
+ // It is possible to serve only one live migration data
copy request per instance at a time.
+ // Checking if there is another migration is in progress
before accepting new one.
+ boolean accepted =
currentTasks.compute(localInstanceMetadata.id(), (integer, taskInMap) -> {
+ if (taskInMap == null)
+ {
+ return newTask;
+ }
+
+ if (!taskInMap.isCompleted())
+ {
+ // Accept new task if and only if the existing task
has completed.
+ return taskInMap;
+ }
+ else
+ {
+ return newTask;
+ }
+ }) == newTask;
+
+ if (!accepted)
+ {
+ return Future.failedFuture(
+ new LiveMigrationDataCopyInProgressException("Another
task is already under progress. Cannot accept new task."));
+ }
+ LOGGER.info("Starting data copy task with id={}, source={},
destination={}",
+ newTask.id(), source,
localInstanceMetadata.host());
+ newTask.start();
+ return Future.succeededFuture(newTask);
+ });
+ }
+
+ /**
+ * Initiating data copy once a Cassandra instance starts is not
acceptable. This method checks whether
+ * Cassandra is running or not at the moment on the destination instance
by checking if Sidecar
+ * was able to connect to the Cassandra instance's JMX port. It returns a
failed future if Sidecar
+ * is able to connect to the JMX port of Cassandra.
+ *
+ * @param localInstance metadata for the local Cassandra instance
+ * @return Future that succeeds if Cassandra is not running, fails if it
is running
+ */
+ private Future<Void> verifyCassandraNotRunning(InstanceMetadata
localInstance)
+ {
+ return executorPools.internal().executeBlocking(() -> {
Review Comment:
this does not need to run on a blocking method. There's no IO happening
here. The check is an in-memory check. Which brings the question about whether
you want to do an online check here instead of relying on a cached value?
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java:
##########
@@ -71,8 +99,24 @@ public
LiveMigrationDataCopyRequest(@JsonProperty("maxIterations") int maxIterat
+ ". It cannot be less than or
equal to zero.");
}
+ // Validate optional gossip fetch parameters if specified
+ if (gossipFetchBatchSize != null && gossipFetchBatchSize <= 0)
+ {
+ throw new IllegalArgumentException("Invalid gossipFetchBatchSize "
+ gossipFetchBatchSize
+ + ". It must be greater than
zero when specified.");
+ }
+
+ if (gossipFetchMaxRetries != null && gossipFetchMaxRetries <= 0)
Review Comment:
should we also have some guardrails around an upperbound here? I'm thinking
from a point of view of a malicious payload that can make the service retry for
a long time?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]