nvharikrishna commented on code in PR #309: URL: https://github.com/apache/cassandra-sidecar/pull/309#discussion_r2875781019
########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.response.LiveMigrationTaskCreationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException; +import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.DigestAlgorithmFactory; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * HTTP handler for creating file digest verification tasks during live migration. + * Manages concurrent verification tasks per instance and orchestrates the verification process. + */ +public class LiveMigrationCreateFilesVerificationTaskHandler extends AbstractHandler<LiveMigrationFilesVerificationRequest> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationCreateFilesVerificationTaskHandler.class); + + private final FilesVerificationTaskManager filesVerificationTaskManager; + private final LiveMigrationMap liveMigrationMap; + private final LiveMigrationConfiguration liveMigrationConfiguration; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + protected LiveMigrationCreateFilesVerificationTaskHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + LiveMigrationMap liveMigrationMap, + FilesVerificationTaskManager filesVerificationTaskManager, + SidecarConfiguration sidecarConfiguration) + { + super(metadataFetcher, executorPools, validator); + this.liveMigrationMap = liveMigrationMap; + this.filesVerificationTaskManager = filesVerificationTaskManager; + this.liveMigrationConfiguration = sidecarConfiguration.liveMigrationConfiguration(); + } + + @Override + protected LiveMigrationFilesVerificationRequest extractParamsOrThrow(RoutingContext context) + { + try + { + LiveMigrationFilesVerificationRequest request = + Json.decodeValue(context.body().buffer(), LiveMigrationFilesVerificationRequest.class); + + if (request.maxConcurrency() > liveMigrationConfiguration.maxConcurrentFileRequests()) + { + throw new IllegalArgumentException("Invalid maxConcurrency " + request.maxConcurrency() + + ". It cannot be greater than " + + liveMigrationConfiguration.maxConcurrentFileRequests()); + } Review Comment: De-duplicated the checks ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateFilesVerificationTaskHandler.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.response.LiveMigrationTaskCreationResponse; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException; +import org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.DigestAlgorithmFactory; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY; +import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * HTTP handler for creating file digest verification tasks during live migration. + * Manages concurrent verification tasks per instance and orchestrates the verification process. + */ +public class LiveMigrationCreateFilesVerificationTaskHandler extends AbstractHandler<LiveMigrationFilesVerificationRequest> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationCreateFilesVerificationTaskHandler.class); + + private final FilesVerificationTaskManager filesVerificationTaskManager; + private final LiveMigrationMap liveMigrationMap; + private final LiveMigrationConfiguration liveMigrationConfiguration; + + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the interface to retrieve instance metadata + * @param executorPools the executor pools for blocking executions + * @param validator a validator instance to validate Cassandra-specific input + */ + @Inject + protected LiveMigrationCreateFilesVerificationTaskHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + LiveMigrationMap liveMigrationMap, + FilesVerificationTaskManager filesVerificationTaskManager, + SidecarConfiguration sidecarConfiguration) + { + super(metadataFetcher, executorPools, validator); + this.liveMigrationMap = liveMigrationMap; + this.filesVerificationTaskManager = filesVerificationTaskManager; + this.liveMigrationConfiguration = sidecarConfiguration.liveMigrationConfiguration(); + } + + @Override + protected LiveMigrationFilesVerificationRequest extractParamsOrThrow(RoutingContext context) + { + try + { + LiveMigrationFilesVerificationRequest request = + Json.decodeValue(context.body().buffer(), LiveMigrationFilesVerificationRequest.class); + + if (request.maxConcurrency() > liveMigrationConfiguration.maxConcurrentFileRequests()) + { + throw new IllegalArgumentException("Invalid maxConcurrency " + request.maxConcurrency() + + ". It cannot be greater than " + + liveMigrationConfiguration.maxConcurrentFileRequests()); + } + + // Validate digest algorithm without creating an instance + DigestAlgorithmFactory.validateAlgorithmName(request.digestAlgorithm()); + + return request; + } + catch (DecodeException decodeException) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + "Failed to parse request body, please ensure that the request is valid.", + decodeException); + } + catch (IllegalArgumentException e) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, e.getMessage(), e); + } + } + + @Override + protected void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + LiveMigrationFilesVerificationRequest request) + { + LOGGER.debug("Received files verification request for host {} with maxConcurrency {}", + host, request.maxConcurrency()); + InstanceMetadata localInstanceMetadata; + try + { + localInstanceMetadata = metadataFetcher.instance(host); + } + catch (NoSuchCassandraInstanceException e) + { + LOGGER.error("Failed to fetch instance metadata for host={}", host); + context.fail(wrapHttpException(SERVICE_UNAVAILABLE, e)); + return; + } + + liveMigrationMap.getSource(host) + .compose(source -> filesVerificationTaskManager.createTask(request, source, localInstanceMetadata)) + + .onSuccess(task -> { + LOGGER.info("Created files verification task {} for host {}", task.id(), host); + context.response().setStatusCode(ACCEPTED.code()); + String statusUrl = LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE + "/" + task.id(); + context.json(new LiveMigrationTaskCreationResponse(task.id(), statusUrl)); + }) + + .onFailure(throwable -> { + if (throwable instanceof LiveMigrationTaskInProgressException) + { + LOGGER.error("Cannot start a new files verification task for host {} " + + "while another live migration task is in progress.", host); + context.fail(wrapHttpException(FORBIDDEN, throwable.getMessage(), throwable)); Review Comment: CONFLICT makes more sense. Updated it. ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFilesVerificationTask.java: ########## @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.core.MultiMap; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.client.SidecarClient; +import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.DataObjectBuilder; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.request.data.Digest; +import org.apache.cassandra.sidecar.common.request.data.MD5Digest; +import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest; +import org.apache.cassandra.sidecar.common.response.DigestResponse; +import org.apache.cassandra.sidecar.common.response.InstanceFileInfo; +import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.DigestMismatchException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.FileVerificationFailureException; +import org.apache.cassandra.sidecar.utils.DigestVerifierFactory; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.VisibleForTesting; + +import static org.apache.cassandra.sidecar.common.response.InstanceFileInfo.FileType.DIRECTORY; +import static org.apache.cassandra.sidecar.livemigration.LiveMigrationInstanceMetadataUtil.localPath; + +/** + * Verifies file integrity between source and destination instances during live migration + * by comparing file lists and validating file digests using configurable digest algorithms. + * + * <p>The verification process consists of three stages: + * <ol> + * <li>Fetch file lists from both source and destination instances concurrently</li> + * <li>Compare file metadata (size, type, modification time) - fails fast on mismatches</li> Review Comment: I intended to say that the task won’t proceed to verifying file digests and will stop here if there are metadata mismatches. However, I understand your point. I’ve updated the text to avoid any confusion. ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFilesVerificationTask.java: ########## @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.core.MultiMap; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.client.SidecarClient; +import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.DataObjectBuilder; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.request.data.Digest; +import org.apache.cassandra.sidecar.common.request.data.MD5Digest; +import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest; +import org.apache.cassandra.sidecar.common.response.DigestResponse; +import org.apache.cassandra.sidecar.common.response.InstanceFileInfo; +import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.DigestMismatchException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.FileVerificationFailureException; +import org.apache.cassandra.sidecar.utils.DigestVerifierFactory; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.VisibleForTesting; + +import static org.apache.cassandra.sidecar.common.response.InstanceFileInfo.FileType.DIRECTORY; +import static org.apache.cassandra.sidecar.livemigration.LiveMigrationInstanceMetadataUtil.localPath; + +/** + * Verifies file integrity between source and destination instances during live migration + * by comparing file lists and validating file digests using configurable digest algorithms. + * + * <p>The verification process consists of three stages: + * <ol> + * <li>Fetch file lists from both source and destination instances concurrently</li> + * <li>Compare file metadata (size, type, modification time) - fails fast on mismatches</li> + * <li>Verify cryptographic digests (MD5 or XXHash32) with configurable concurrency</li> + * </ol> + * + * <p>The task supports cancellation, and tracks detailed metrics including + * metadata matches/mismatches, digest verification results, and failure counts. + */ +public class LiveMigrationFilesVerificationTask implements LiveMigrationTask<LiveMigrationFilesVerificationResponse> +{ + public static final String FILES_VERIFICATION_TASK_TYPE = "files-verification-task"; + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationFilesVerificationTask.class); + private final Vertx vertx; + private final String id; + private final String source; + private final int port; + // Instance metadata of current instance + private final InstanceMetadata instanceMetadata; + private final ExecutorPools executorPools; + private final LiveMigrationFilesVerificationRequest request; + private final SidecarClient sidecarClient; + private final LiveMigrationConfiguration liveMigrationConfiguration; + private final DigestVerifierFactory digestVerifierFactory; + private final AtomicReference<State> state; + private final Promise<Void> completionPromise = Promise.promise(); + private final AtomicInteger filesNotFoundAtSource = new AtomicInteger(0); + private final AtomicInteger filesNotFoundAtDestination = new AtomicInteger(0); + private final AtomicInteger metadataMatched = new AtomicInteger(0); + private final AtomicInteger metadataMismatches = new AtomicInteger(0); + private final AtomicInteger digestMismatches = new AtomicInteger(0); + private final AtomicInteger digestVerificationFailures = new AtomicInteger(0); + private final AtomicInteger filesMatched = new AtomicInteger(0); + private final String logPrefix; + private final AtomicBoolean cancelled = new AtomicBoolean(false); + private volatile AsyncConcurrentTaskExecutor<String> asyncConcurrentTaskExecutor; + + @VisibleForTesting + protected LiveMigrationFilesVerificationTask(Builder builder) + { + this.vertx = builder.vertx; + this.id = builder.id; + this.source = builder.source; + this.port = builder.port; + this.instanceMetadata = builder.instanceMetadata; + this.executorPools = builder.executorPools; + this.request = builder.request; + this.sidecarClient = builder.sidecarClient; + this.liveMigrationConfiguration = builder.liveMigrationConfiguration; + this.digestVerifierFactory = builder.digestVerifierFactory; + this.logPrefix = "fileVerifyTaskId=" + id; + this.state = new AtomicReference<>(State.NOT_STARTED); + } + + public static Builder builder() + { + return new Builder(); + } + + public Future<Void> validate() + { + return abortIfCancelled(new Object()) + // Get files info from both source and destination + .compose(v -> Future.join(listFilesInLocal(), fetchSourceFileList())) + + // Compare the files information before calculating digests + .compose(this::abortIfCancelled) + .compose(cf -> compareFilesMeta(cf.resultAt(0), cf.resultAt(1))) + + // Create digest comparison tasks + .compose(this::abortIfCancelled) + .compose(this::getDigestComparisonTasks) + + // Compare digests + .compose(this::abortIfCancelled) + .compose(this::compareDigests) + + .andThen(ar -> { + if (ar.failed()) + { + LOGGER.error("{} Files verification task failed during validation", logPrefix, ar.cause()); + // Fail the task if it is in IN_PROGRESS state. + state.compareAndSet(State.IN_PROGRESS, State.FAILED); + } + else + { + State currentState = state.compareAndExchange(State.IN_PROGRESS, State.COMPLETED); + if (currentState == State.CANCELLED) + { + LOGGER.debug("{} Task was cancelled before completion", logPrefix); + } + else if (currentState != State.IN_PROGRESS) + { + LOGGER.warn("{} Unexpected state transition failure. State was {}", logPrefix, currentState); + } + else + { + LOGGER.info("{} Files verification task completed successfully", logPrefix); + } + } + }); + } + + private Future<List<InstanceFileInfo>> listFilesInLocal() + { + return executorPools.internal() + .executeBlocking(() -> new CassandraInstanceFilesImpl(instanceMetadata, + liveMigrationConfiguration) + .files()); + } + + private Future<InstanceFilesListResponse> fetchSourceFileList() + { + return Future.fromCompletionStage( + sidecarClient.liveMigrationListInstanceFilesAsync(new SidecarInstanceImpl(source, port))) + .onFailure(cause -> LOGGER.error("{} Failed to obtain list of files from source {}:{}", + logPrefix, source, port, cause)); + } + + private @NotNull Future<List<InstanceFileInfo>> compareFilesMeta(List<InstanceFileInfo> localFiles, Review Comment: done ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFilesVerificationTask.java: ########## @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.core.MultiMap; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.client.SidecarClient; +import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.DataObjectBuilder; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.request.data.Digest; +import org.apache.cassandra.sidecar.common.request.data.MD5Digest; +import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest; +import org.apache.cassandra.sidecar.common.response.DigestResponse; +import org.apache.cassandra.sidecar.common.response.InstanceFileInfo; +import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.DigestMismatchException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.FileVerificationFailureException; +import org.apache.cassandra.sidecar.utils.DigestVerifierFactory; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.VisibleForTesting; + +import static org.apache.cassandra.sidecar.common.response.InstanceFileInfo.FileType.DIRECTORY; +import static org.apache.cassandra.sidecar.livemigration.LiveMigrationInstanceMetadataUtil.localPath; + +/** + * Verifies file integrity between source and destination instances during live migration + * by comparing file lists and validating file digests using configurable digest algorithms. + * + * <p>The verification process consists of three stages: + * <ol> + * <li>Fetch file lists from both source and destination instances concurrently</li> + * <li>Compare file metadata (size, type, modification time) - fails fast on mismatches</li> + * <li>Verify cryptographic digests (MD5 or XXHash32) with configurable concurrency</li> + * </ol> + * + * <p>The task supports cancellation, and tracks detailed metrics including + * metadata matches/mismatches, digest verification results, and failure counts. + */ +public class LiveMigrationFilesVerificationTask implements LiveMigrationTask<LiveMigrationFilesVerificationResponse> +{ + public static final String FILES_VERIFICATION_TASK_TYPE = "files-verification-task"; + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationFilesVerificationTask.class); + private final Vertx vertx; + private final String id; + private final String source; + private final int port; + // Instance metadata of current instance + private final InstanceMetadata instanceMetadata; + private final ExecutorPools executorPools; + private final LiveMigrationFilesVerificationRequest request; + private final SidecarClient sidecarClient; + private final LiveMigrationConfiguration liveMigrationConfiguration; + private final DigestVerifierFactory digestVerifierFactory; + private final AtomicReference<State> state; + private final Promise<Void> completionPromise = Promise.promise(); + private final AtomicInteger filesNotFoundAtSource = new AtomicInteger(0); + private final AtomicInteger filesNotFoundAtDestination = new AtomicInteger(0); + private final AtomicInteger metadataMatched = new AtomicInteger(0); + private final AtomicInteger metadataMismatches = new AtomicInteger(0); + private final AtomicInteger digestMismatches = new AtomicInteger(0); + private final AtomicInteger digestVerificationFailures = new AtomicInteger(0); + private final AtomicInteger filesMatched = new AtomicInteger(0); + private final String logPrefix; + private final AtomicBoolean cancelled = new AtomicBoolean(false); + private volatile AsyncConcurrentTaskExecutor<String> asyncConcurrentTaskExecutor; + + @VisibleForTesting + protected LiveMigrationFilesVerificationTask(Builder builder) + { + this.vertx = builder.vertx; + this.id = builder.id; + this.source = builder.source; + this.port = builder.port; + this.instanceMetadata = builder.instanceMetadata; + this.executorPools = builder.executorPools; + this.request = builder.request; + this.sidecarClient = builder.sidecarClient; + this.liveMigrationConfiguration = builder.liveMigrationConfiguration; + this.digestVerifierFactory = builder.digestVerifierFactory; + this.logPrefix = "fileVerifyTaskId=" + id; + this.state = new AtomicReference<>(State.NOT_STARTED); + } + + public static Builder builder() + { + return new Builder(); + } + + public Future<Void> validate() + { + return abortIfCancelled(new Object()) + // Get files info from both source and destination + .compose(v -> Future.join(listFilesInLocal(), fetchSourceFileList())) + + // Compare the files information before calculating digests + .compose(this::abortIfCancelled) + .compose(cf -> compareFilesMeta(cf.resultAt(0), cf.resultAt(1))) + + // Create digest comparison tasks + .compose(this::abortIfCancelled) + .compose(this::getDigestComparisonTasks) + + // Compare digests + .compose(this::abortIfCancelled) + .compose(this::compareDigests) + + .andThen(ar -> { + if (ar.failed()) + { + LOGGER.error("{} Files verification task failed during validation", logPrefix, ar.cause()); + // Fail the task if it is in IN_PROGRESS state. + state.compareAndSet(State.IN_PROGRESS, State.FAILED); + } + else + { + State currentState = state.compareAndExchange(State.IN_PROGRESS, State.COMPLETED); + if (currentState == State.CANCELLED) + { + LOGGER.debug("{} Task was cancelled before completion", logPrefix); + } + else if (currentState != State.IN_PROGRESS) + { + LOGGER.warn("{} Unexpected state transition failure. State was {}", logPrefix, currentState); + } + else + { + LOGGER.info("{} Files verification task completed successfully", logPrefix); + } + } + }); + } + + private Future<List<InstanceFileInfo>> listFilesInLocal() + { + return executorPools.internal() + .executeBlocking(() -> new CassandraInstanceFilesImpl(instanceMetadata, + liveMigrationConfiguration) + .files()); + } + + private Future<InstanceFilesListResponse> fetchSourceFileList() + { + return Future.fromCompletionStage( + sidecarClient.liveMigrationListInstanceFilesAsync(new SidecarInstanceImpl(source, port))) + .onFailure(cause -> LOGGER.error("{} Failed to obtain list of files from source {}:{}", + logPrefix, source, port, cause)); + } + + private @NotNull Future<List<InstanceFileInfo>> compareFilesMeta(List<InstanceFileInfo> localFiles, + InstanceFilesListResponse sourceFiles) + { + Map<String, InstanceFileInfo> filesAtLocal = + localFiles.stream() + .collect(Collectors.toMap(fileInfo -> fileInfo.fileUrl, fileInfo -> fileInfo)); + + Map<String, InstanceFileInfo> filesAtSource = + sourceFiles.getFiles().stream() + .collect(Collectors.toMap(fileInfo -> fileInfo.fileUrl, fileInfo -> fileInfo)); + + for (Map.Entry<String, InstanceFileInfo> localFileEntry : filesAtLocal.entrySet()) + { + String fileUrl = localFileEntry.getKey(); + InstanceFileInfo localFile = localFileEntry.getValue(); + if (filesAtSource.containsKey(fileUrl)) + { + InstanceFileInfo sourceFile = filesAtSource.get(fileUrl); + // compare files metadata + if (localFile.equals(sourceFile) || + (localFile.fileType == DIRECTORY && sourceFile.fileType == DIRECTORY)) + { + // File info matches if either of them are directories or + // their size, last modified time etc... matches + LOGGER.debug("{} {} file metadata matched with source", logPrefix, fileUrl); + metadataMatched.incrementAndGet(); + } + else + { + // Files metadata did not match + metadataMismatches.incrementAndGet(); + LOGGER.error("{} {} did not match with source. Local file info={}, source file info={}", + logPrefix, fileUrl, localFile, sourceFile); + } + + // done with processing current local file, remove it + filesAtSource.remove(fileUrl); + } + else + { + // File is not available at source, report it + LOGGER.error("{} File {} exists at destination but not found at source", logPrefix, fileUrl); + filesNotFoundAtSource.incrementAndGet(); + } + } + + for (Map.Entry<String, InstanceFileInfo> sourceFileEntry : filesAtSource.entrySet()) + { + // Remaining files are not present at destination, report error for them + LOGGER.error("{} File {} exists at source but not found at destination", logPrefix, sourceFileEntry.getKey()); + filesNotFoundAtDestination.incrementAndGet(); + } + + if (filesNotFoundAtDestination.get() > 0 + || filesNotFoundAtSource.get() > 0 + || metadataMismatches.get() > 0) + { + FileVerificationFailureException exception = + new FileVerificationFailureException("Files list did not match between source and destination"); + + return Future.failedFuture(exception); + } + + return Future.succeededFuture(localFiles); + } + + private @NotNull Future<List<Future<String>>> getDigestComparisonTasks(List<InstanceFileInfo> localFiles) + { + // now verify digests of each local file with source file + List<Callable<Future<String>>> tasks = new ArrayList<>(localFiles.size()); + localFiles.stream() + .filter(fileInfo -> fileInfo.fileType != DIRECTORY) + .forEach(fileInfo -> tasks.add(() -> verifyDigest(fileInfo))); + asyncConcurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, request.maxConcurrency()); + List<Future<String>> verifyTaskResults = asyncConcurrentTaskExecutor.start(); + return Future.join(verifyTaskResults) + .transform(ar -> Future.succeededFuture(verifyTaskResults)); + } + + private Future<Void> compareDigests(List<Future<String>> futures) + { + boolean verificationFailed = false; + for (Future<String> future : futures) + { + if (future.failed()) + { + verificationFailed = true; + Throwable cause = future.cause(); + if (cause instanceof DigestMismatchException) + { + DigestMismatchException ex = (DigestMismatchException) cause; + LOGGER.error("{} File digests did not match for {}.", logPrefix, ex.fileUrl(), cause); + digestMismatches.incrementAndGet(); + } + else + { + LOGGER.error("{} Failed to verify digest for file. Check previous logs for file details", + logPrefix, cause); + digestVerificationFailures.incrementAndGet(); + } + continue; + } + + // Digest verification succeeded + filesMatched.incrementAndGet(); + } + + if (verificationFailed) + { + return Future.failedFuture( + new FileVerificationFailureException("File digests did not match between source and destination")); + } + else + { + return Future.succeededFuture(); + } + } + + @VisibleForTesting + Future<String> verifyDigest(InstanceFileInfo fileInfo) + { + return getSourceFileDigest(fileInfo) + .compose(digest -> { + String path = localPath(fileInfo.fileUrl, instanceMetadata).toAbsolutePath().toString(); + return digestVerifierFactory.verifier(MultiMap.caseInsensitiveMultiMap().addAll(digest.headers())) + .verify(path) + .compose(verified -> Future.succeededFuture(path)) + .recover(cause -> Future.failedFuture( + new DigestMismatchException(path, fileInfo.fileUrl, cause))); + }) + .onSuccess(filePath -> LOGGER.debug("{} Verified file {}", logPrefix, fileInfo.fileUrl)) + .onFailure(cause -> LOGGER.error("{} Failed to verify file {}", logPrefix, fileInfo.fileUrl, cause)); + } + + private Future<Digest> getSourceFileDigest(InstanceFileInfo fileInfo) + { + return Future.fromCompletionStage(sidecarClient.liveMigrationFileDigestAsync(new SidecarInstanceImpl(source, port), + fileInfo.fileUrl, + request.digestAlgorithm())) + .compose(this::toDigest); + } + + private Future<Digest> toDigest(DigestResponse digestResponse) + { + String digestAlgorithm = digestResponse.digestAlgorithm; + if (digestAlgorithm.equalsIgnoreCase(MD5Digest.MD5_ALGORITHM)) + { + return Future.succeededFuture(new MD5Digest(digestResponse.digest)); + } + else if (digestAlgorithm.equalsIgnoreCase(XXHash32Digest.XXHASH_32_ALGORITHM)) + { + return Future.succeededFuture(new XXHash32Digest(digestResponse.digest)); + } Review Comment: Felt both Digest and DigestResponse are not very relative to DigestAlgorithm. So, did not placed it DigestAlgorithmFactory. Moved it to DigestResponse. ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskManager.java: ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import com.google.common.annotations.VisibleForTesting; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions; +import org.jetbrains.annotations.NotNull; + +/** + * Centralized singleton manager for all live migration tasks across Cassandra instances. + * Enforces the constraint that only one {@link LiveMigrationTask} (of any type) can be active per instance at a time, + * preventing concurrent data copy and files verification operations that could lead to resource conflicts or data integrity issues. + * This singleton is shared across {@link DataCopyTaskManager} and {@link FilesVerificationTaskManager} to ensure + * proper coordination and mutual exclusion of tasks per instance. + */ +@Singleton +public class LiveMigrationTaskManager +{ + @VisibleForTesting + final ConcurrentHashMap<Integer, LiveMigrationTask<?>> currentTasks = new ConcurrentHashMap<>(); + + private final InstancesMetadata instancesMetadata; + + @Inject + public LiveMigrationTaskManager(InstancesMetadata instancesMetadata) + { + this.instancesMetadata = instancesMetadata; + } + + /** + * Attempts to submit a new task for the specified instance. + * Only one task (of any type) can be active per instance at a time. + * + * @param instanceId the instance ID + * @param newTask the task to submit + * @return true if the task was accepted, false if another task is already in progress + */ + public boolean submitTask(int instanceId, LiveMigrationTask<?> newTask) + { + return currentTasks.compute(instanceId, (integer, taskInMap) -> { + if (taskInMap == null) + { + return newTask; + } + + if (!taskInMap.isCompleted()) + { + // Reject new task if existing task is still in progress + return taskInMap; + } + else + { + // Accept new task if existing task has completed + return newTask; + } + }) == newTask; + } + + /** + * Returns all live migration tasks for given currentHost. + * This includes both active and completed tasks that haven't been replaced. + * + * @param currentHost the host where sidecar is running + * @return list containing at most one task (empty if no task has ever been submitted for this host) + */ + @SuppressWarnings("ConstantValue") + public List<LiveMigrationTask<?>> getAllTasks(@NotNull String currentHost) + { + InstanceMetadata localInstance = instancesMetadata.instanceFromHost(currentHost); + if (localInstance == null) + { + throw new IllegalStateException("No instance found for host: " + currentHost); + } + if (!currentTasks.containsKey(localInstance.id())) + { + return Collections.emptyList(); + } + return Collections.singletonList(currentTasks.get(localInstance.id())); Review Comment: Fetching task only once and returning the value. ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/FilesVerificationTaskManager.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.List; +import java.util.Objects; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Future; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.livemigration.LiveMigrationFilesVerificationTask.FILES_VERIFICATION_TASK_TYPE; + +/** + * Manages the lifecycle of file digest verification tasks during live migration operations. + * This manager ensures that only one {@link LiveMigrationTask} can be active per instance at a time, + * preventing concurrent migration operations that could impact system resources. + * Tasks are created using the {@link LiveMigrationFilesVerificationTaskFactory} and + * executed asynchronously to validate file integrity between source and destination nodes. + */ +@Singleton +public class FilesVerificationTaskManager +{ + private static final Logger LOGGER = LoggerFactory.getLogger(FilesVerificationTaskManager.class); + + private final LiveMigrationTaskManager taskManager; + private final LiveMigrationFilesVerificationTaskFactory taskFactory; + private final SidecarConfiguration sidecarConfiguration; + + @Inject + public FilesVerificationTaskManager(LiveMigrationTaskManager taskManager, + SidecarConfiguration sidecarConfiguration, + LiveMigrationFilesVerificationTaskFactory taskFactory) + { + this.taskManager = taskManager; + this.sidecarConfiguration = sidecarConfiguration; + this.taskFactory = taskFactory; + } + + /** + * Creates and submits a new file digest verification task for the specified instance. + * Only one task (of any type) can be active per instance at a time. If a task is already running, + * this method returns a failed future with {@link LiveMigrationTaskInProgressException}. + * + * @param request the file verification request containing digest information + * @param source the source identifier for the verification request + * @param localInstanceMetadata metadata of the local Cassandra instance + * @return a future that succeeds if the task is created and started, or fails with + * {@link LiveMigrationTaskInProgressException} if another task is already in progress + */ + public Future<LiveMigrationTask<LiveMigrationFilesVerificationResponse>> createTask(LiveMigrationFilesVerificationRequest request, + String source, + InstanceMetadata localInstanceMetadata) + { + int maxPossibleConcurrency = Objects.requireNonNull(sidecarConfiguration.liveMigrationConfiguration()) + .maxConcurrentFileRequests(); + if (request.maxConcurrency() > maxPossibleConcurrency) + { + return Future.failedFuture( + new LiveMigrationInvalidRequestException("max concurrency can not be more than " + maxPossibleConcurrency)); + } + + return createVerifier(request, source, localInstanceMetadata) + .compose(newTask -> { + boolean accepted = taskManager.submitTask(localInstanceMetadata.id(), newTask); + + if (accepted) + { + newTask.start(); + LOGGER.info("Accepted new files digest verification task for instance={} taskId={}", + localInstanceMetadata.id(), newTask.id()); + return Future.succeededFuture(newTask); + } + else + { + return Future.failedFuture(new LiveMigrationTaskInProgressException( + "Another files digests verification is in progress for instance=" + localInstanceMetadata.id())); Review Comment: done ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/FilesVerificationTaskManager.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.List; +import java.util.Objects; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Future; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.livemigration.LiveMigrationFilesVerificationTask.FILES_VERIFICATION_TASK_TYPE; + +/** + * Manages the lifecycle of file digest verification tasks during live migration operations. + * This manager ensures that only one {@link LiveMigrationTask} can be active per instance at a time, + * preventing concurrent migration operations that could impact system resources. + * Tasks are created using the {@link LiveMigrationFilesVerificationTaskFactory} and + * executed asynchronously to validate file integrity between source and destination nodes. + */ +@Singleton +public class FilesVerificationTaskManager Review Comment: Started with inheriting LiveMigrationTaskManager, but had to switch to association. Since FilesVerificationTaskManager and DataCopyTaskManager are different types, Guice creates a separate instance for each, meaning each would get its own currentTasks map (instance to task map in LiveMigrationTaskManager). This breaks the invariant that only one task of any type can be active per instance at a time (line 44). Don't want to use a static mutable map for inheritance. So, used association. ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/FilesVerificationTaskManager.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.List; +import java.util.Objects; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Future; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.livemigration.LiveMigrationFilesVerificationTask.FILES_VERIFICATION_TASK_TYPE; + +/** + * Manages the lifecycle of file digest verification tasks during live migration operations. + * This manager ensures that only one {@link LiveMigrationTask} can be active per instance at a time, + * preventing concurrent migration operations that could impact system resources. + * Tasks are created using the {@link LiveMigrationFilesVerificationTaskFactory} and + * executed asynchronously to validate file integrity between source and destination nodes. + */ +@Singleton +public class FilesVerificationTaskManager +{ + private static final Logger LOGGER = LoggerFactory.getLogger(FilesVerificationTaskManager.class); + + private final LiveMigrationTaskManager taskManager; + private final LiveMigrationFilesVerificationTaskFactory taskFactory; + private final SidecarConfiguration sidecarConfiguration; + + @Inject + public FilesVerificationTaskManager(LiveMigrationTaskManager taskManager, + SidecarConfiguration sidecarConfiguration, + LiveMigrationFilesVerificationTaskFactory taskFactory) + { + this.taskManager = taskManager; + this.sidecarConfiguration = sidecarConfiguration; + this.taskFactory = taskFactory; + } + + /** + * Creates and submits a new file digest verification task for the specified instance. + * Only one task (of any type) can be active per instance at a time. If a task is already running, + * this method returns a failed future with {@link LiveMigrationTaskInProgressException}. + * + * @param request the file verification request containing digest information + * @param source the source identifier for the verification request + * @param localInstanceMetadata metadata of the local Cassandra instance + * @return a future that succeeds if the task is created and started, or fails with + * {@link LiveMigrationTaskInProgressException} if another task is already in progress + */ + public Future<LiveMigrationTask<LiveMigrationFilesVerificationResponse>> createTask(LiveMigrationFilesVerificationRequest request, + String source, + InstanceMetadata localInstanceMetadata) + { + int maxPossibleConcurrency = Objects.requireNonNull(sidecarConfiguration.liveMigrationConfiguration()) + .maxConcurrentFileRequests(); + if (request.maxConcurrency() > maxPossibleConcurrency) + { + return Future.failedFuture( + new LiveMigrationInvalidRequestException("max concurrency can not be more than " + maxPossibleConcurrency)); + } + + return createVerifier(request, source, localInstanceMetadata) + .compose(newTask -> { + boolean accepted = taskManager.submitTask(localInstanceMetadata.id(), newTask); + + if (accepted) + { + newTask.start(); + LOGGER.info("Accepted new files digest verification task for instance={} taskId={}", + localInstanceMetadata.id(), newTask.id()); + return Future.succeededFuture(newTask); + } + else + { + return Future.failedFuture(new LiveMigrationTaskInProgressException( + "Another files digests verification is in progress for instance=" + localInstanceMetadata.id())); + } + }); + } + + private Future<LiveMigrationTask<LiveMigrationFilesVerificationResponse>> createVerifier(LiveMigrationFilesVerificationRequest request, + String source, + InstanceMetadata localInstanceMetadata) + { + String timeUuid = UUIDs.timeBased().toString(); + return Future.succeededFuture(taskFactory.create(timeUuid, + source, + sidecarConfiguration.serviceConfiguration().port(), + request, + localInstanceMetadata)); + } Review Comment: make sense, updated it ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFilesVerificationTask.java: ########## @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.core.MultiMap; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.client.SidecarClient; +import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.DataObjectBuilder; +import org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest; +import org.apache.cassandra.sidecar.common.request.data.Digest; +import org.apache.cassandra.sidecar.common.request.data.MD5Digest; +import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest; +import org.apache.cassandra.sidecar.common.response.DigestResponse; +import org.apache.cassandra.sidecar.common.response.InstanceFileInfo; +import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse; +import org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse; +import org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.DigestMismatchException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.FileVerificationFailureException; +import org.apache.cassandra.sidecar.utils.DigestVerifierFactory; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.VisibleForTesting; + +import static org.apache.cassandra.sidecar.common.response.InstanceFileInfo.FileType.DIRECTORY; +import static org.apache.cassandra.sidecar.livemigration.LiveMigrationInstanceMetadataUtil.localPath; + +/** + * Verifies file integrity between source and destination instances during live migration + * by comparing file lists and validating file digests using configurable digest algorithms. + * + * <p>The verification process consists of three stages: + * <ol> + * <li>Fetch file lists from both source and destination instances concurrently</li> + * <li>Compare file metadata (size, type, modification time) - fails fast on mismatches</li> + * <li>Verify cryptographic digests (MD5 or XXHash32) with configurable concurrency</li> + * </ol> + * + * <p>The task supports cancellation, and tracks detailed metrics including + * metadata matches/mismatches, digest verification results, and failure counts. + */ +public class LiveMigrationFilesVerificationTask implements LiveMigrationTask<LiveMigrationFilesVerificationResponse> +{ + public static final String FILES_VERIFICATION_TASK_TYPE = "files-verification-task"; + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationFilesVerificationTask.class); + private final Vertx vertx; + private final String id; + private final String source; + private final int port; + // Instance metadata of current instance + private final InstanceMetadata instanceMetadata; + private final ExecutorPools executorPools; + private final LiveMigrationFilesVerificationRequest request; + private final SidecarClient sidecarClient; + private final LiveMigrationConfiguration liveMigrationConfiguration; + private final DigestVerifierFactory digestVerifierFactory; + private final AtomicReference<State> state; + private final Promise<Void> completionPromise = Promise.promise(); + private final AtomicInteger filesNotFoundAtSource = new AtomicInteger(0); + private final AtomicInteger filesNotFoundAtDestination = new AtomicInteger(0); + private final AtomicInteger metadataMatched = new AtomicInteger(0); + private final AtomicInteger metadataMismatches = new AtomicInteger(0); + private final AtomicInteger digestMismatches = new AtomicInteger(0); + private final AtomicInteger digestVerificationFailures = new AtomicInteger(0); + private final AtomicInteger filesMatched = new AtomicInteger(0); + private final String logPrefix; + private final AtomicBoolean cancelled = new AtomicBoolean(false); + private volatile AsyncConcurrentTaskExecutor<String> asyncConcurrentTaskExecutor; + + @VisibleForTesting + protected LiveMigrationFilesVerificationTask(Builder builder) + { + this.vertx = builder.vertx; + this.id = builder.id; + this.source = builder.source; + this.port = builder.port; + this.instanceMetadata = builder.instanceMetadata; + this.executorPools = builder.executorPools; + this.request = builder.request; + this.sidecarClient = builder.sidecarClient; + this.liveMigrationConfiguration = builder.liveMigrationConfiguration; + this.digestVerifierFactory = builder.digestVerifierFactory; + this.logPrefix = "fileVerifyTaskId=" + id; + this.state = new AtomicReference<>(State.NOT_STARTED); + } + + public static Builder builder() + { + return new Builder(); + } + + public Future<Void> validate() + { + return abortIfCancelled(new Object()) + // Get files info from both source and destination + .compose(v -> Future.join(listFilesInLocal(), fetchSourceFileList())) + + // Compare the files information before calculating digests + .compose(this::abortIfCancelled) + .compose(cf -> compareFilesMeta(cf.resultAt(0), cf.resultAt(1))) + + // Create digest comparison tasks + .compose(this::abortIfCancelled) + .compose(this::getDigestComparisonTasks) + + // Compare digests + .compose(this::abortIfCancelled) + .compose(this::compareDigests) + + .andThen(ar -> { + if (ar.failed()) + { + LOGGER.error("{} Files verification task failed during validation", logPrefix, ar.cause()); + // Fail the task if it is in IN_PROGRESS state. + state.compareAndSet(State.IN_PROGRESS, State.FAILED); + } + else + { + State currentState = state.compareAndExchange(State.IN_PROGRESS, State.COMPLETED); + if (currentState == State.CANCELLED) + { + LOGGER.debug("{} Task was cancelled before completion", logPrefix); + } + else if (currentState != State.IN_PROGRESS) + { + LOGGER.warn("{} Unexpected state transition failure. State was {}", logPrefix, currentState); + } + else + { + LOGGER.info("{} Files verification task completed successfully", logPrefix); + } + } + }); + } + + private Future<List<InstanceFileInfo>> listFilesInLocal() + { + return executorPools.internal() + .executeBlocking(() -> new CassandraInstanceFilesImpl(instanceMetadata, + liveMigrationConfiguration) + .files()); + } + + private Future<InstanceFilesListResponse> fetchSourceFileList() + { + return Future.fromCompletionStage( + sidecarClient.liveMigrationListInstanceFilesAsync(new SidecarInstanceImpl(source, port))) + .onFailure(cause -> LOGGER.error("{} Failed to obtain list of files from source {}:{}", + logPrefix, source, port, cause)); + } + + private @NotNull Future<List<InstanceFileInfo>> compareFilesMeta(List<InstanceFileInfo> localFiles, + InstanceFilesListResponse sourceFiles) + { + Map<String, InstanceFileInfo> filesAtLocal = + localFiles.stream() + .collect(Collectors.toMap(fileInfo -> fileInfo.fileUrl, fileInfo -> fileInfo)); + + Map<String, InstanceFileInfo> filesAtSource = + sourceFiles.getFiles().stream() + .collect(Collectors.toMap(fileInfo -> fileInfo.fileUrl, fileInfo -> fileInfo)); + + for (Map.Entry<String, InstanceFileInfo> localFileEntry : filesAtLocal.entrySet()) + { + String fileUrl = localFileEntry.getKey(); + InstanceFileInfo localFile = localFileEntry.getValue(); + if (filesAtSource.containsKey(fileUrl)) + { + InstanceFileInfo sourceFile = filesAtSource.get(fileUrl); + // compare files metadata + if (localFile.equals(sourceFile) || + (localFile.fileType == DIRECTORY && sourceFile.fileType == DIRECTORY)) + { + // File info matches if either of them are directories or + // their size, last modified time etc... matches + LOGGER.debug("{} {} file metadata matched with source", logPrefix, fileUrl); + metadataMatched.incrementAndGet(); + } + else + { + // Files metadata did not match + metadataMismatches.incrementAndGet(); + LOGGER.error("{} {} did not match with source. Local file info={}, source file info={}", + logPrefix, fileUrl, localFile, sourceFile); + } + + // done with processing current local file, remove it + filesAtSource.remove(fileUrl); + } + else + { + // File is not available at source, report it + LOGGER.error("{} File {} exists at destination but not found at source", logPrefix, fileUrl); + filesNotFoundAtSource.incrementAndGet(); + } + } + + for (Map.Entry<String, InstanceFileInfo> sourceFileEntry : filesAtSource.entrySet()) + { + // Remaining files are not present at destination, report error for them + LOGGER.error("{} File {} exists at source but not found at destination", logPrefix, sourceFileEntry.getKey()); + filesNotFoundAtDestination.incrementAndGet(); + } + + if (filesNotFoundAtDestination.get() > 0 + || filesNotFoundAtSource.get() > 0 + || metadataMismatches.get() > 0) + { + FileVerificationFailureException exception = + new FileVerificationFailureException("Files list did not match between source and destination"); + + return Future.failedFuture(exception); + } + + return Future.succeededFuture(localFiles); + } + + private @NotNull Future<List<Future<String>>> getDigestComparisonTasks(List<InstanceFileInfo> localFiles) + { + // now verify digests of each local file with source file + List<Callable<Future<String>>> tasks = new ArrayList<>(localFiles.size()); + localFiles.stream() + .filter(fileInfo -> fileInfo.fileType != DIRECTORY) + .forEach(fileInfo -> tasks.add(() -> verifyDigest(fileInfo))); + asyncConcurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, request.maxConcurrency()); + List<Future<String>> verifyTaskResults = asyncConcurrentTaskExecutor.start(); + return Future.join(verifyTaskResults) + .transform(ar -> Future.succeededFuture(verifyTaskResults)); + } + + private Future<Void> compareDigests(List<Future<String>> futures) + { + boolean verificationFailed = false; + for (Future<String> future : futures) + { + if (future.failed()) + { + verificationFailed = true; + Throwable cause = future.cause(); + if (cause instanceof DigestMismatchException) + { + DigestMismatchException ex = (DigestMismatchException) cause; + LOGGER.error("{} File digests did not match for {}.", logPrefix, ex.fileUrl(), cause); + digestMismatches.incrementAndGet(); + } + else + { + LOGGER.error("{} Failed to verify digest for file. Check previous logs for file details", + logPrefix, cause); + digestVerificationFailures.incrementAndGet(); + } + continue; + } + + // Digest verification succeeded + filesMatched.incrementAndGet(); + } + + if (verificationFailed) + { + return Future.failedFuture( + new FileVerificationFailureException("File digests did not match between source and destination")); + } + else + { + return Future.succeededFuture(); + } + } + + @VisibleForTesting + Future<String> verifyDigest(InstanceFileInfo fileInfo) + { + return getSourceFileDigest(fileInfo) + .compose(digest -> { + String path = localPath(fileInfo.fileUrl, instanceMetadata).toAbsolutePath().toString(); + return digestVerifierFactory.verifier(MultiMap.caseInsensitiveMultiMap().addAll(digest.headers())) + .verify(path) + .compose(verified -> Future.succeededFuture(path)) + .recover(cause -> Future.failedFuture( + new DigestMismatchException(path, fileInfo.fileUrl, cause))); + }) + .onSuccess(filePath -> LOGGER.debug("{} Verified file {}", logPrefix, fileInfo.fileUrl)) + .onFailure(cause -> LOGGER.error("{} Failed to verify file {}", logPrefix, fileInfo.fileUrl, cause)); + } + + private Future<Digest> getSourceFileDigest(InstanceFileInfo fileInfo) + { + return Future.fromCompletionStage(sidecarClient.liveMigrationFileDigestAsync(new SidecarInstanceImpl(source, port), + fileInfo.fileUrl, + request.digestAlgorithm())) + .compose(this::toDigest); + } Review Comment: After giving some more thought, I feel the 429 status code (TOO_MANY_REQUESTS) is not appropriate and the 503 (SERVICE UNAVAILABLE) is more appropriate as maxConcurrentFileRequests (used by `LiveMigrationConcurrencyLimitHandler`) is a server-side concurrency cap shared across all clients, and an individual client hitting this limit hasn't done anything wrong - the server is simply busy with other requests. 503 describes this situation. SidecarclientProvider is using `ExponentialBackoffRetryPolicy` as the default retry policy, which does retries. If we suspect that the default retry policy can be overridden, then I can explicitly initiate an instance of ExponentialBackoffRetryPolicy and use it. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

