nvharikrishna commented on code in PR #309:
URL: https://github.com/apache/cassandra-sidecar/pull/309#discussion_r2875781019


##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateFilesVerificationTaskHandler.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationTaskCreationResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException;
+import 
org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException;
+import org.apache.cassandra.sidecar.handlers.AbstractHandler;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.DigestAlgorithmFactory;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE;
+import static 
org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY;
+import static 
org.apache.cassandra.sidecar.common.ApiEndpointsV1.LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * HTTP handler for creating file digest verification tasks during live 
migration.
+ * Manages concurrent verification tasks per instance and orchestrates the 
verification process.
+ */
+public class LiveMigrationCreateFilesVerificationTaskHandler extends 
AbstractHandler<LiveMigrationFilesVerificationRequest> implements 
AccessProtected
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationCreateFilesVerificationTaskHandler.class);
+
+    private final FilesVerificationTaskManager filesVerificationTaskManager;
+    private final LiveMigrationMap liveMigrationMap;
+    private final LiveMigrationConfiguration liveMigrationConfiguration;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the interface to retrieve instance metadata
+     * @param executorPools   the executor pools for blocking executions
+     * @param validator       a validator instance to validate 
Cassandra-specific input
+     */
+    @Inject
+    protected 
LiveMigrationCreateFilesVerificationTaskHandler(InstanceMetadataFetcher 
metadataFetcher,
+                                                              ExecutorPools 
executorPools,
+                                                              
CassandraInputValidator validator,
+                                                              LiveMigrationMap 
liveMigrationMap,
+                                                              
FilesVerificationTaskManager filesVerificationTaskManager,
+                                                              
SidecarConfiguration sidecarConfiguration)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.liveMigrationMap = liveMigrationMap;
+        this.filesVerificationTaskManager = filesVerificationTaskManager;
+        this.liveMigrationConfiguration = 
sidecarConfiguration.liveMigrationConfiguration();
+    }
+
+    @Override
+    protected LiveMigrationFilesVerificationRequest 
extractParamsOrThrow(RoutingContext context)
+    {
+        try
+        {
+            LiveMigrationFilesVerificationRequest request =
+            Json.decodeValue(context.body().buffer(), 
LiveMigrationFilesVerificationRequest.class);
+
+            if (request.maxConcurrency() > 
liveMigrationConfiguration.maxConcurrentFileRequests())
+            {
+                throw new IllegalArgumentException("Invalid maxConcurrency " + 
request.maxConcurrency() +
+                                                   ". It cannot be greater 
than " +
+                                                   
liveMigrationConfiguration.maxConcurrentFileRequests());
+            }

Review Comment:
   De-duplicated the checks



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateFilesVerificationTaskHandler.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationTaskCreationResponse;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException;
+import 
org.apache.cassandra.sidecar.exceptions.NoSuchCassandraInstanceException;
+import org.apache.cassandra.sidecar.handlers.AbstractHandler;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.livemigration.FilesVerificationTaskManager;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.DigestAlgorithmFactory;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE;
+import static 
org.apache.cassandra.sidecar.acl.authorization.BasicPermissions.DATA_COPY;
+import static 
org.apache.cassandra.sidecar.common.ApiEndpointsV1.LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * HTTP handler for creating file digest verification tasks during live 
migration.
+ * Manages concurrent verification tasks per instance and orchestrates the 
verification process.
+ */
+public class LiveMigrationCreateFilesVerificationTaskHandler extends 
AbstractHandler<LiveMigrationFilesVerificationRequest> implements 
AccessProtected
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationCreateFilesVerificationTaskHandler.class);
+
+    private final FilesVerificationTaskManager filesVerificationTaskManager;
+    private final LiveMigrationMap liveMigrationMap;
+    private final LiveMigrationConfiguration liveMigrationConfiguration;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the interface to retrieve instance metadata
+     * @param executorPools   the executor pools for blocking executions
+     * @param validator       a validator instance to validate 
Cassandra-specific input
+     */
+    @Inject
+    protected 
LiveMigrationCreateFilesVerificationTaskHandler(InstanceMetadataFetcher 
metadataFetcher,
+                                                              ExecutorPools 
executorPools,
+                                                              
CassandraInputValidator validator,
+                                                              LiveMigrationMap 
liveMigrationMap,
+                                                              
FilesVerificationTaskManager filesVerificationTaskManager,
+                                                              
SidecarConfiguration sidecarConfiguration)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.liveMigrationMap = liveMigrationMap;
+        this.filesVerificationTaskManager = filesVerificationTaskManager;
+        this.liveMigrationConfiguration = 
sidecarConfiguration.liveMigrationConfiguration();
+    }
+
+    @Override
+    protected LiveMigrationFilesVerificationRequest 
extractParamsOrThrow(RoutingContext context)
+    {
+        try
+        {
+            LiveMigrationFilesVerificationRequest request =
+            Json.decodeValue(context.body().buffer(), 
LiveMigrationFilesVerificationRequest.class);
+
+            if (request.maxConcurrency() > 
liveMigrationConfiguration.maxConcurrentFileRequests())
+            {
+                throw new IllegalArgumentException("Invalid maxConcurrency " + 
request.maxConcurrency() +
+                                                   ". It cannot be greater 
than " +
+                                                   
liveMigrationConfiguration.maxConcurrentFileRequests());
+            }
+
+            // Validate digest algorithm without creating an instance
+            
DigestAlgorithmFactory.validateAlgorithmName(request.digestAlgorithm());
+
+            return request;
+        }
+        catch (DecodeException decodeException)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                                    "Failed to parse request body, please 
ensure that the request is valid.",
+                                    decodeException);
+        }
+        catch (IllegalArgumentException e)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  @NotNull String host,
+                                  SocketAddress remoteAddress,
+                                  LiveMigrationFilesVerificationRequest 
request)
+    {
+        LOGGER.debug("Received files verification request for host {} with 
maxConcurrency {}",
+                     host, request.maxConcurrency());
+        InstanceMetadata localInstanceMetadata;
+        try
+        {
+            localInstanceMetadata = metadataFetcher.instance(host);
+        }
+        catch (NoSuchCassandraInstanceException e)
+        {
+            LOGGER.error("Failed to fetch instance metadata for host={}", 
host);
+            context.fail(wrapHttpException(SERVICE_UNAVAILABLE, e));
+            return;
+        }
+
+        liveMigrationMap.getSource(host)
+                        .compose(source -> 
filesVerificationTaskManager.createTask(request, source, localInstanceMetadata))
+
+                        .onSuccess(task -> {
+                            LOGGER.info("Created files verification task {} 
for host {}", task.id(), host);
+                            context.response().setStatusCode(ACCEPTED.code());
+                            String statusUrl = 
LIVE_MIGRATION_FILES_VERIFICATION_TASKS_ROUTE + "/" + task.id();
+                            context.json(new 
LiveMigrationTaskCreationResponse(task.id(), statusUrl));
+                        })
+
+                        .onFailure(throwable -> {
+                            if (throwable instanceof 
LiveMigrationTaskInProgressException)
+                            {
+                                LOGGER.error("Cannot start a new files 
verification task for host {} " +
+                                             "while another live migration 
task is in progress.", host);
+                                context.fail(wrapHttpException(FORBIDDEN, 
throwable.getMessage(), throwable));

Review Comment:
   CONFLICT makes more sense. Updated it.



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFilesVerificationTask.java:
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.client.SidecarClient;
+import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.DataObjectBuilder;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest;
+import org.apache.cassandra.sidecar.common.request.data.Digest;
+import org.apache.cassandra.sidecar.common.request.data.MD5Digest;
+import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest;
+import org.apache.cassandra.sidecar.common.response.DigestResponse;
+import org.apache.cassandra.sidecar.common.response.InstanceFileInfo;
+import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse;
+import org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.DigestMismatchException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.FileVerificationFailureException;
+import org.apache.cassandra.sidecar.utils.DigestVerifierFactory;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static 
org.apache.cassandra.sidecar.common.response.InstanceFileInfo.FileType.DIRECTORY;
+import static 
org.apache.cassandra.sidecar.livemigration.LiveMigrationInstanceMetadataUtil.localPath;
+
+/**
+ * Verifies file integrity between source and destination instances during 
live migration
+ * by comparing file lists and validating file digests using configurable 
digest algorithms.
+ *
+ * <p>The verification process consists of three stages:
+ * <ol>
+ *   <li>Fetch file lists from both source and destination instances 
concurrently</li>
+ *   <li>Compare file metadata (size, type, modification time) - fails fast on 
mismatches</li>

Review Comment:
   I intended to say that the task won’t proceed to verifying file digests and 
will stop here if there are metadata mismatches. However, I understand your 
point. I’ve updated the text to avoid any confusion.



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFilesVerificationTask.java:
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.client.SidecarClient;
+import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.DataObjectBuilder;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest;
+import org.apache.cassandra.sidecar.common.request.data.Digest;
+import org.apache.cassandra.sidecar.common.request.data.MD5Digest;
+import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest;
+import org.apache.cassandra.sidecar.common.response.DigestResponse;
+import org.apache.cassandra.sidecar.common.response.InstanceFileInfo;
+import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse;
+import org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.DigestMismatchException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.FileVerificationFailureException;
+import org.apache.cassandra.sidecar.utils.DigestVerifierFactory;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static 
org.apache.cassandra.sidecar.common.response.InstanceFileInfo.FileType.DIRECTORY;
+import static 
org.apache.cassandra.sidecar.livemigration.LiveMigrationInstanceMetadataUtil.localPath;
+
+/**
+ * Verifies file integrity between source and destination instances during 
live migration
+ * by comparing file lists and validating file digests using configurable 
digest algorithms.
+ *
+ * <p>The verification process consists of three stages:
+ * <ol>
+ *   <li>Fetch file lists from both source and destination instances 
concurrently</li>
+ *   <li>Compare file metadata (size, type, modification time) - fails fast on 
mismatches</li>
+ *   <li>Verify cryptographic digests (MD5 or XXHash32) with configurable 
concurrency</li>
+ * </ol>
+ *
+ * <p>The task supports cancellation, and tracks detailed metrics including
+ * metadata matches/mismatches, digest verification results, and failure 
counts.
+ */
+public class LiveMigrationFilesVerificationTask implements 
LiveMigrationTask<LiveMigrationFilesVerificationResponse>
+{
+    public static final String FILES_VERIFICATION_TASK_TYPE = 
"files-verification-task";
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationFilesVerificationTask.class);
+    private final Vertx vertx;
+    private final String id;
+    private final String source;
+    private final int port;
+    // Instance metadata of current instance
+    private final InstanceMetadata instanceMetadata;
+    private final ExecutorPools executorPools;
+    private final LiveMigrationFilesVerificationRequest request;
+    private final SidecarClient sidecarClient;
+    private final LiveMigrationConfiguration liveMigrationConfiguration;
+    private final DigestVerifierFactory digestVerifierFactory;
+    private final AtomicReference<State> state;
+    private final Promise<Void> completionPromise = Promise.promise();
+    private final AtomicInteger filesNotFoundAtSource = new AtomicInteger(0);
+    private final AtomicInteger filesNotFoundAtDestination = new 
AtomicInteger(0);
+    private final AtomicInteger metadataMatched = new AtomicInteger(0);
+    private final AtomicInteger metadataMismatches = new AtomicInteger(0);
+    private final AtomicInteger digestMismatches = new AtomicInteger(0);
+    private final AtomicInteger digestVerificationFailures = new 
AtomicInteger(0);
+    private final AtomicInteger filesMatched = new AtomicInteger(0);
+    private final String logPrefix;
+    private final AtomicBoolean cancelled = new AtomicBoolean(false);
+    private volatile AsyncConcurrentTaskExecutor<String> 
asyncConcurrentTaskExecutor;
+
+    @VisibleForTesting
+    protected LiveMigrationFilesVerificationTask(Builder builder)
+    {
+        this.vertx = builder.vertx;
+        this.id = builder.id;
+        this.source = builder.source;
+        this.port = builder.port;
+        this.instanceMetadata = builder.instanceMetadata;
+        this.executorPools = builder.executorPools;
+        this.request = builder.request;
+        this.sidecarClient = builder.sidecarClient;
+        this.liveMigrationConfiguration = builder.liveMigrationConfiguration;
+        this.digestVerifierFactory = builder.digestVerifierFactory;
+        this.logPrefix = "fileVerifyTaskId=" + id;
+        this.state = new AtomicReference<>(State.NOT_STARTED);
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public Future<Void> validate()
+    {
+        return abortIfCancelled(new Object())
+               // Get files info from both source and destination
+               .compose(v -> Future.join(listFilesInLocal(), 
fetchSourceFileList()))
+
+               // Compare the files information before calculating digests
+               .compose(this::abortIfCancelled)
+               .compose(cf -> compareFilesMeta(cf.resultAt(0), cf.resultAt(1)))
+
+               // Create digest comparison tasks
+               .compose(this::abortIfCancelled)
+               .compose(this::getDigestComparisonTasks)
+
+               // Compare digests
+               .compose(this::abortIfCancelled)
+               .compose(this::compareDigests)
+
+               .andThen(ar -> {
+                   if (ar.failed())
+                   {
+                       LOGGER.error("{} Files verification task failed during 
validation", logPrefix, ar.cause());
+                       // Fail the task if it is in IN_PROGRESS state.
+                       state.compareAndSet(State.IN_PROGRESS, State.FAILED);
+                   }
+                   else
+                   {
+                       State currentState = 
state.compareAndExchange(State.IN_PROGRESS, State.COMPLETED);
+                       if (currentState == State.CANCELLED)
+                       {
+                           LOGGER.debug("{} Task was cancelled before 
completion", logPrefix);
+                       }
+                       else if (currentState != State.IN_PROGRESS)
+                       {
+                           LOGGER.warn("{} Unexpected state transition 
failure. State was {}", logPrefix, currentState);
+                       }
+                       else
+                       {
+                           LOGGER.info("{} Files verification task completed 
successfully", logPrefix);
+                       }
+                   }
+               });
+    }
+
+    private Future<List<InstanceFileInfo>> listFilesInLocal()
+    {
+        return executorPools.internal()
+                            .executeBlocking(() -> new 
CassandraInstanceFilesImpl(instanceMetadata,
+                                                                               
   liveMigrationConfiguration)
+                                                   .files());
+    }
+
+    private Future<InstanceFilesListResponse> fetchSourceFileList()
+    {
+        return Future.fromCompletionStage(
+                     sidecarClient.liveMigrationListInstanceFilesAsync(new 
SidecarInstanceImpl(source, port)))
+                     .onFailure(cause -> LOGGER.error("{} Failed to obtain 
list of files from source {}:{}",
+                                                      logPrefix, source, port, 
cause));
+    }
+
+    private @NotNull Future<List<InstanceFileInfo>> 
compareFilesMeta(List<InstanceFileInfo> localFiles,

Review Comment:
   done



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFilesVerificationTask.java:
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.client.SidecarClient;
+import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.DataObjectBuilder;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest;
+import org.apache.cassandra.sidecar.common.request.data.Digest;
+import org.apache.cassandra.sidecar.common.request.data.MD5Digest;
+import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest;
+import org.apache.cassandra.sidecar.common.response.DigestResponse;
+import org.apache.cassandra.sidecar.common.response.InstanceFileInfo;
+import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse;
+import org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.DigestMismatchException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.FileVerificationFailureException;
+import org.apache.cassandra.sidecar.utils.DigestVerifierFactory;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static 
org.apache.cassandra.sidecar.common.response.InstanceFileInfo.FileType.DIRECTORY;
+import static 
org.apache.cassandra.sidecar.livemigration.LiveMigrationInstanceMetadataUtil.localPath;
+
+/**
+ * Verifies file integrity between source and destination instances during 
live migration
+ * by comparing file lists and validating file digests using configurable 
digest algorithms.
+ *
+ * <p>The verification process consists of three stages:
+ * <ol>
+ *   <li>Fetch file lists from both source and destination instances 
concurrently</li>
+ *   <li>Compare file metadata (size, type, modification time) - fails fast on 
mismatches</li>
+ *   <li>Verify cryptographic digests (MD5 or XXHash32) with configurable 
concurrency</li>
+ * </ol>
+ *
+ * <p>The task supports cancellation, and tracks detailed metrics including
+ * metadata matches/mismatches, digest verification results, and failure 
counts.
+ */
+public class LiveMigrationFilesVerificationTask implements 
LiveMigrationTask<LiveMigrationFilesVerificationResponse>
+{
+    public static final String FILES_VERIFICATION_TASK_TYPE = 
"files-verification-task";
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationFilesVerificationTask.class);
+    private final Vertx vertx;
+    private final String id;
+    private final String source;
+    private final int port;
+    // Instance metadata of current instance
+    private final InstanceMetadata instanceMetadata;
+    private final ExecutorPools executorPools;
+    private final LiveMigrationFilesVerificationRequest request;
+    private final SidecarClient sidecarClient;
+    private final LiveMigrationConfiguration liveMigrationConfiguration;
+    private final DigestVerifierFactory digestVerifierFactory;
+    private final AtomicReference<State> state;
+    private final Promise<Void> completionPromise = Promise.promise();
+    private final AtomicInteger filesNotFoundAtSource = new AtomicInteger(0);
+    private final AtomicInteger filesNotFoundAtDestination = new 
AtomicInteger(0);
+    private final AtomicInteger metadataMatched = new AtomicInteger(0);
+    private final AtomicInteger metadataMismatches = new AtomicInteger(0);
+    private final AtomicInteger digestMismatches = new AtomicInteger(0);
+    private final AtomicInteger digestVerificationFailures = new 
AtomicInteger(0);
+    private final AtomicInteger filesMatched = new AtomicInteger(0);
+    private final String logPrefix;
+    private final AtomicBoolean cancelled = new AtomicBoolean(false);
+    private volatile AsyncConcurrentTaskExecutor<String> 
asyncConcurrentTaskExecutor;
+
+    @VisibleForTesting
+    protected LiveMigrationFilesVerificationTask(Builder builder)
+    {
+        this.vertx = builder.vertx;
+        this.id = builder.id;
+        this.source = builder.source;
+        this.port = builder.port;
+        this.instanceMetadata = builder.instanceMetadata;
+        this.executorPools = builder.executorPools;
+        this.request = builder.request;
+        this.sidecarClient = builder.sidecarClient;
+        this.liveMigrationConfiguration = builder.liveMigrationConfiguration;
+        this.digestVerifierFactory = builder.digestVerifierFactory;
+        this.logPrefix = "fileVerifyTaskId=" + id;
+        this.state = new AtomicReference<>(State.NOT_STARTED);
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public Future<Void> validate()
+    {
+        return abortIfCancelled(new Object())
+               // Get files info from both source and destination
+               .compose(v -> Future.join(listFilesInLocal(), 
fetchSourceFileList()))
+
+               // Compare the files information before calculating digests
+               .compose(this::abortIfCancelled)
+               .compose(cf -> compareFilesMeta(cf.resultAt(0), cf.resultAt(1)))
+
+               // Create digest comparison tasks
+               .compose(this::abortIfCancelled)
+               .compose(this::getDigestComparisonTasks)
+
+               // Compare digests
+               .compose(this::abortIfCancelled)
+               .compose(this::compareDigests)
+
+               .andThen(ar -> {
+                   if (ar.failed())
+                   {
+                       LOGGER.error("{} Files verification task failed during 
validation", logPrefix, ar.cause());
+                       // Fail the task if it is in IN_PROGRESS state.
+                       state.compareAndSet(State.IN_PROGRESS, State.FAILED);
+                   }
+                   else
+                   {
+                       State currentState = 
state.compareAndExchange(State.IN_PROGRESS, State.COMPLETED);
+                       if (currentState == State.CANCELLED)
+                       {
+                           LOGGER.debug("{} Task was cancelled before 
completion", logPrefix);
+                       }
+                       else if (currentState != State.IN_PROGRESS)
+                       {
+                           LOGGER.warn("{} Unexpected state transition 
failure. State was {}", logPrefix, currentState);
+                       }
+                       else
+                       {
+                           LOGGER.info("{} Files verification task completed 
successfully", logPrefix);
+                       }
+                   }
+               });
+    }
+
+    private Future<List<InstanceFileInfo>> listFilesInLocal()
+    {
+        return executorPools.internal()
+                            .executeBlocking(() -> new 
CassandraInstanceFilesImpl(instanceMetadata,
+                                                                               
   liveMigrationConfiguration)
+                                                   .files());
+    }
+
+    private Future<InstanceFilesListResponse> fetchSourceFileList()
+    {
+        return Future.fromCompletionStage(
+                     sidecarClient.liveMigrationListInstanceFilesAsync(new 
SidecarInstanceImpl(source, port)))
+                     .onFailure(cause -> LOGGER.error("{} Failed to obtain 
list of files from source {}:{}",
+                                                      logPrefix, source, port, 
cause));
+    }
+
+    private @NotNull Future<List<InstanceFileInfo>> 
compareFilesMeta(List<InstanceFileInfo> localFiles,
+                                                                     
InstanceFilesListResponse sourceFiles)
+    {
+        Map<String, InstanceFileInfo> filesAtLocal =
+        localFiles.stream()
+                  .collect(Collectors.toMap(fileInfo -> fileInfo.fileUrl, 
fileInfo -> fileInfo));
+
+        Map<String, InstanceFileInfo> filesAtSource =
+        sourceFiles.getFiles().stream()
+                   .collect(Collectors.toMap(fileInfo -> fileInfo.fileUrl, 
fileInfo -> fileInfo));
+
+        for (Map.Entry<String, InstanceFileInfo> localFileEntry : 
filesAtLocal.entrySet())
+        {
+            String fileUrl = localFileEntry.getKey();
+            InstanceFileInfo localFile = localFileEntry.getValue();
+            if (filesAtSource.containsKey(fileUrl))
+            {
+                InstanceFileInfo sourceFile = filesAtSource.get(fileUrl);
+                // compare files metadata
+                if (localFile.equals(sourceFile) ||
+                    (localFile.fileType == DIRECTORY && sourceFile.fileType == 
DIRECTORY))
+                {
+                    // File info matches if either of them are directories or
+                    // their size, last modified time etc... matches
+                    LOGGER.debug("{} {} file metadata matched with source", 
logPrefix, fileUrl);
+                    metadataMatched.incrementAndGet();
+                }
+                else
+                {
+                    // Files metadata did not match
+                    metadataMismatches.incrementAndGet();
+                    LOGGER.error("{} {} did not match with source. Local file 
info={}, source file info={}",
+                                 logPrefix, fileUrl, localFile, sourceFile);
+                }
+
+                // done with processing current local file, remove it
+                filesAtSource.remove(fileUrl);
+            }
+            else
+            {
+                // File is not available at source, report it
+                LOGGER.error("{} File {} exists at destination but not found 
at source", logPrefix, fileUrl);
+                filesNotFoundAtSource.incrementAndGet();
+            }
+        }
+
+        for (Map.Entry<String, InstanceFileInfo> sourceFileEntry : 
filesAtSource.entrySet())
+        {
+            // Remaining files are not present at destination, report error 
for them
+            LOGGER.error("{} File {} exists at source but not found at 
destination", logPrefix, sourceFileEntry.getKey());
+            filesNotFoundAtDestination.incrementAndGet();
+        }
+
+        if (filesNotFoundAtDestination.get() > 0
+            || filesNotFoundAtSource.get() > 0
+            || metadataMismatches.get() > 0)
+        {
+            FileVerificationFailureException exception =
+            new FileVerificationFailureException("Files list did not match 
between source and destination");
+
+            return Future.failedFuture(exception);
+        }
+
+        return Future.succeededFuture(localFiles);
+    }
+
+    private @NotNull Future<List<Future<String>>> 
getDigestComparisonTasks(List<InstanceFileInfo> localFiles)
+    {
+        // now verify digests of each local file with source file
+        List<Callable<Future<String>>> tasks = new 
ArrayList<>(localFiles.size());
+        localFiles.stream()
+                  .filter(fileInfo -> fileInfo.fileType != DIRECTORY)
+                  .forEach(fileInfo -> tasks.add(() -> 
verifyDigest(fileInfo)));
+        asyncConcurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, 
tasks, request.maxConcurrency());
+        List<Future<String>> verifyTaskResults = 
asyncConcurrentTaskExecutor.start();
+        return Future.join(verifyTaskResults)
+                     .transform(ar -> 
Future.succeededFuture(verifyTaskResults));
+    }
+
+    private Future<Void> compareDigests(List<Future<String>> futures)
+    {
+        boolean verificationFailed = false;
+        for (Future<String> future : futures)
+        {
+            if (future.failed())
+            {
+                verificationFailed = true;
+                Throwable cause = future.cause();
+                if (cause instanceof DigestMismatchException)
+                {
+                    DigestMismatchException ex = (DigestMismatchException) 
cause;
+                    LOGGER.error("{} File digests did not match for {}.", 
logPrefix, ex.fileUrl(), cause);
+                    digestMismatches.incrementAndGet();
+                }
+                else
+                {
+                    LOGGER.error("{} Failed to verify digest for file. Check 
previous logs for file details",
+                                 logPrefix, cause);
+                    digestVerificationFailures.incrementAndGet();
+                }
+                continue;
+            }
+
+            // Digest verification succeeded
+            filesMatched.incrementAndGet();
+        }
+
+        if (verificationFailed)
+        {
+            return Future.failedFuture(
+            new FileVerificationFailureException("File digests did not match 
between source and destination"));
+        }
+        else
+        {
+            return Future.succeededFuture();
+        }
+    }
+
+    @VisibleForTesting
+    Future<String> verifyDigest(InstanceFileInfo fileInfo)
+    {
+        return getSourceFileDigest(fileInfo)
+               .compose(digest -> {
+                   String path = localPath(fileInfo.fileUrl, 
instanceMetadata).toAbsolutePath().toString();
+                   return 
digestVerifierFactory.verifier(MultiMap.caseInsensitiveMultiMap().addAll(digest.headers()))
+                                               .verify(path)
+                                               .compose(verified -> 
Future.succeededFuture(path))
+                                               .recover(cause -> 
Future.failedFuture(
+                                               new 
DigestMismatchException(path, fileInfo.fileUrl, cause)));
+               })
+               .onSuccess(filePath -> LOGGER.debug("{} Verified file {}", 
logPrefix, fileInfo.fileUrl))
+               .onFailure(cause -> LOGGER.error("{} Failed to verify file {}", 
logPrefix, fileInfo.fileUrl, cause));
+    }
+
+    private Future<Digest> getSourceFileDigest(InstanceFileInfo fileInfo)
+    {
+        return 
Future.fromCompletionStage(sidecarClient.liveMigrationFileDigestAsync(new 
SidecarInstanceImpl(source, port),
+                                                                               
      fileInfo.fileUrl,
+                                                                               
      request.digestAlgorithm()))
+                     .compose(this::toDigest);
+    }
+
+    private Future<Digest> toDigest(DigestResponse digestResponse)
+    {
+        String digestAlgorithm = digestResponse.digestAlgorithm;
+        if (digestAlgorithm.equalsIgnoreCase(MD5Digest.MD5_ALGORITHM))
+        {
+            return Future.succeededFuture(new 
MD5Digest(digestResponse.digest));
+        }
+        else if 
(digestAlgorithm.equalsIgnoreCase(XXHash32Digest.XXHASH_32_ALGORITHM))
+        {
+            return Future.succeededFuture(new 
XXHash32Digest(digestResponse.digest));
+        }

Review Comment:
   Felt both Digest and DigestResponse are not very relative to 
DigestAlgorithm. So, did not placed it DigestAlgorithmFactory. Moved it to 
DigestResponse.



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskManager.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Centralized singleton manager for all live migration tasks across Cassandra 
instances.
+ * Enforces the constraint that only one {@link LiveMigrationTask} (of any 
type) can be active per instance at a time,
+ * preventing concurrent data copy and files verification operations that 
could lead to resource conflicts or data integrity issues.
+ * This singleton is shared across {@link DataCopyTaskManager} and {@link 
FilesVerificationTaskManager} to ensure
+ * proper coordination and mutual exclusion of tasks per instance.
+ */
+@Singleton
+public class LiveMigrationTaskManager
+{
+    @VisibleForTesting
+    final ConcurrentHashMap<Integer, LiveMigrationTask<?>> currentTasks = new 
ConcurrentHashMap<>();
+
+    private final InstancesMetadata instancesMetadata;
+
+    @Inject
+    public LiveMigrationTaskManager(InstancesMetadata instancesMetadata)
+    {
+        this.instancesMetadata = instancesMetadata;
+    }
+
+    /**
+     * Attempts to submit a new task for the specified instance.
+     * Only one task (of any type) can be active per instance at a time.
+     *
+     * @param instanceId the instance ID
+     * @param newTask    the task to submit
+     * @return true if the task was accepted, false if another task is already 
in progress
+     */
+    public boolean submitTask(int instanceId, LiveMigrationTask<?> newTask)
+    {
+        return currentTasks.compute(instanceId, (integer, taskInMap) -> {
+            if (taskInMap == null)
+            {
+                return newTask;
+            }
+
+            if (!taskInMap.isCompleted())
+            {
+                // Reject new task if existing task is still in progress
+                return taskInMap;
+            }
+            else
+            {
+                // Accept new task if existing task has completed
+                return newTask;
+            }
+        }) == newTask;
+    }
+
+    /**
+     * Returns all live migration tasks for given currentHost.
+     * This includes both active and completed tasks that haven't been 
replaced.
+     *
+     * @param currentHost the host where sidecar is running
+     * @return list containing at most one task (empty if no task has ever 
been submitted for this host)
+     */
+    @SuppressWarnings("ConstantValue")
+    public List<LiveMigrationTask<?>> getAllTasks(@NotNull String currentHost)
+    {
+        InstanceMetadata localInstance = 
instancesMetadata.instanceFromHost(currentHost);
+        if (localInstance == null)
+        {
+            throw new IllegalStateException("No instance found for host: " + 
currentHost);
+        }
+        if (!currentTasks.containsKey(localInstance.id()))
+        {
+            return Collections.emptyList();
+        }
+        return Collections.singletonList(currentTasks.get(localInstance.id()));

Review Comment:
   Fetching task only once and returning the value.



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/FilesVerificationTaskManager.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.livemigration.LiveMigrationFilesVerificationTask.FILES_VERIFICATION_TASK_TYPE;
+
+/**
+ * Manages the lifecycle of file digest verification tasks during live 
migration operations.
+ * This manager ensures that only one {@link LiveMigrationTask} can be active 
per instance at a time,
+ * preventing concurrent migration operations that could impact system 
resources.
+ * Tasks are created using the {@link 
LiveMigrationFilesVerificationTaskFactory} and
+ * executed asynchronously to validate file integrity between source and 
destination nodes.
+ */
+@Singleton
+public class FilesVerificationTaskManager
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FilesVerificationTaskManager.class);
+
+    private final LiveMigrationTaskManager taskManager;
+    private final LiveMigrationFilesVerificationTaskFactory taskFactory;
+    private final SidecarConfiguration sidecarConfiguration;
+
+    @Inject
+    public FilesVerificationTaskManager(LiveMigrationTaskManager taskManager,
+                                        SidecarConfiguration 
sidecarConfiguration,
+                                        
LiveMigrationFilesVerificationTaskFactory taskFactory)
+    {
+        this.taskManager = taskManager;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.taskFactory = taskFactory;
+    }
+
+    /**
+     * Creates and submits a new file digest verification task for the 
specified instance.
+     * Only one task (of any type) can be active per instance at a time. If a 
task is already running,
+     * this method returns a failed future with {@link 
LiveMigrationTaskInProgressException}.
+     *
+     * @param request               the file verification request containing 
digest information
+     * @param source                the source identifier for the verification 
request
+     * @param localInstanceMetadata metadata of the local Cassandra instance
+     * @return a future that succeeds if the task is created and started, or 
fails with
+     * {@link LiveMigrationTaskInProgressException} if another task is already 
in progress
+     */
+    public Future<LiveMigrationTask<LiveMigrationFilesVerificationResponse>> 
createTask(LiveMigrationFilesVerificationRequest request,
+                                                                               
         String source,
+                                                                               
         InstanceMetadata localInstanceMetadata)
+    {
+        int maxPossibleConcurrency = 
Objects.requireNonNull(sidecarConfiguration.liveMigrationConfiguration())
+                                            .maxConcurrentFileRequests();
+        if (request.maxConcurrency() > maxPossibleConcurrency)
+        {
+            return Future.failedFuture(
+            new LiveMigrationInvalidRequestException("max concurrency can not 
be more than " + maxPossibleConcurrency));
+        }
+
+        return createVerifier(request, source, localInstanceMetadata)
+               .compose(newTask -> {
+                   boolean accepted = 
taskManager.submitTask(localInstanceMetadata.id(), newTask);
+
+                   if (accepted)
+                   {
+                       newTask.start();
+                       LOGGER.info("Accepted new files digest verification 
task for instance={} taskId={}",
+                                   localInstanceMetadata.id(), newTask.id());
+                       return Future.succeededFuture(newTask);
+                   }
+                   else
+                   {
+                       return Future.failedFuture(new 
LiveMigrationTaskInProgressException(
+                       "Another files digests verification is in progress for 
instance=" + localInstanceMetadata.id()));

Review Comment:
   done



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/FilesVerificationTaskManager.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.livemigration.LiveMigrationFilesVerificationTask.FILES_VERIFICATION_TASK_TYPE;
+
+/**
+ * Manages the lifecycle of file digest verification tasks during live 
migration operations.
+ * This manager ensures that only one {@link LiveMigrationTask} can be active 
per instance at a time,
+ * preventing concurrent migration operations that could impact system 
resources.
+ * Tasks are created using the {@link 
LiveMigrationFilesVerificationTaskFactory} and
+ * executed asynchronously to validate file integrity between source and 
destination nodes.
+ */
+@Singleton
+public class FilesVerificationTaskManager

Review Comment:
   Started with inheriting LiveMigrationTaskManager, but had to switch to 
association. Since FilesVerificationTaskManager and DataCopyTaskManager are 
different types, Guice creates a separate instance for each, meaning each would 
get its own currentTasks map (instance to task map in 
LiveMigrationTaskManager). This breaks the invariant that only one task of any 
type can be active per instance at a time (line 44).  Don't want to use a 
static mutable map for inheritance. So, used association.



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/FilesVerificationTaskManager.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskInProgressException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.livemigration.LiveMigrationFilesVerificationTask.FILES_VERIFICATION_TASK_TYPE;
+
+/**
+ * Manages the lifecycle of file digest verification tasks during live 
migration operations.
+ * This manager ensures that only one {@link LiveMigrationTask} can be active 
per instance at a time,
+ * preventing concurrent migration operations that could impact system 
resources.
+ * Tasks are created using the {@link 
LiveMigrationFilesVerificationTaskFactory} and
+ * executed asynchronously to validate file integrity between source and 
destination nodes.
+ */
+@Singleton
+public class FilesVerificationTaskManager
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FilesVerificationTaskManager.class);
+
+    private final LiveMigrationTaskManager taskManager;
+    private final LiveMigrationFilesVerificationTaskFactory taskFactory;
+    private final SidecarConfiguration sidecarConfiguration;
+
+    @Inject
+    public FilesVerificationTaskManager(LiveMigrationTaskManager taskManager,
+                                        SidecarConfiguration 
sidecarConfiguration,
+                                        
LiveMigrationFilesVerificationTaskFactory taskFactory)
+    {
+        this.taskManager = taskManager;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.taskFactory = taskFactory;
+    }
+
+    /**
+     * Creates and submits a new file digest verification task for the 
specified instance.
+     * Only one task (of any type) can be active per instance at a time. If a 
task is already running,
+     * this method returns a failed future with {@link 
LiveMigrationTaskInProgressException}.
+     *
+     * @param request               the file verification request containing 
digest information
+     * @param source                the source identifier for the verification 
request
+     * @param localInstanceMetadata metadata of the local Cassandra instance
+     * @return a future that succeeds if the task is created and started, or 
fails with
+     * {@link LiveMigrationTaskInProgressException} if another task is already 
in progress
+     */
+    public Future<LiveMigrationTask<LiveMigrationFilesVerificationResponse>> 
createTask(LiveMigrationFilesVerificationRequest request,
+                                                                               
         String source,
+                                                                               
         InstanceMetadata localInstanceMetadata)
+    {
+        int maxPossibleConcurrency = 
Objects.requireNonNull(sidecarConfiguration.liveMigrationConfiguration())
+                                            .maxConcurrentFileRequests();
+        if (request.maxConcurrency() > maxPossibleConcurrency)
+        {
+            return Future.failedFuture(
+            new LiveMigrationInvalidRequestException("max concurrency can not 
be more than " + maxPossibleConcurrency));
+        }
+
+        return createVerifier(request, source, localInstanceMetadata)
+               .compose(newTask -> {
+                   boolean accepted = 
taskManager.submitTask(localInstanceMetadata.id(), newTask);
+
+                   if (accepted)
+                   {
+                       newTask.start();
+                       LOGGER.info("Accepted new files digest verification 
task for instance={} taskId={}",
+                                   localInstanceMetadata.id(), newTask.id());
+                       return Future.succeededFuture(newTask);
+                   }
+                   else
+                   {
+                       return Future.failedFuture(new 
LiveMigrationTaskInProgressException(
+                       "Another files digests verification is in progress for 
instance=" + localInstanceMetadata.id()));
+                   }
+               });
+    }
+
+    private Future<LiveMigrationTask<LiveMigrationFilesVerificationResponse>> 
createVerifier(LiveMigrationFilesVerificationRequest request,
+                                                                               
              String source,
+                                                                               
              InstanceMetadata localInstanceMetadata)
+    {
+        String timeUuid = UUIDs.timeBased().toString();
+        return Future.succeededFuture(taskFactory.create(timeUuid,
+                                                         source,
+                                                         
sidecarConfiguration.serviceConfiguration().port(),
+                                                         request,
+                                                         
localInstanceMetadata));
+    }

Review Comment:
   make sense, updated it



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFilesVerificationTask.java:
##########
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.client.SidecarClient;
+import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.DataObjectBuilder;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationFilesVerificationRequest;
+import org.apache.cassandra.sidecar.common.request.data.Digest;
+import org.apache.cassandra.sidecar.common.request.data.MD5Digest;
+import org.apache.cassandra.sidecar.common.request.data.XXHash32Digest;
+import org.apache.cassandra.sidecar.common.response.DigestResponse;
+import org.apache.cassandra.sidecar.common.response.InstanceFileInfo;
+import org.apache.cassandra.sidecar.common.response.InstanceFilesListResponse;
+import 
org.apache.cassandra.sidecar.common.response.LiveMigrationFilesVerificationResponse;
+import org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.DigestMismatchException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.FileVerificationFailureException;
+import org.apache.cassandra.sidecar.utils.DigestVerifierFactory;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.VisibleForTesting;
+
+import static 
org.apache.cassandra.sidecar.common.response.InstanceFileInfo.FileType.DIRECTORY;
+import static 
org.apache.cassandra.sidecar.livemigration.LiveMigrationInstanceMetadataUtil.localPath;
+
+/**
+ * Verifies file integrity between source and destination instances during 
live migration
+ * by comparing file lists and validating file digests using configurable 
digest algorithms.
+ *
+ * <p>The verification process consists of three stages:
+ * <ol>
+ *   <li>Fetch file lists from both source and destination instances 
concurrently</li>
+ *   <li>Compare file metadata (size, type, modification time) - fails fast on 
mismatches</li>
+ *   <li>Verify cryptographic digests (MD5 or XXHash32) with configurable 
concurrency</li>
+ * </ol>
+ *
+ * <p>The task supports cancellation, and tracks detailed metrics including
+ * metadata matches/mismatches, digest verification results, and failure 
counts.
+ */
+public class LiveMigrationFilesVerificationTask implements 
LiveMigrationTask<LiveMigrationFilesVerificationResponse>
+{
+    public static final String FILES_VERIFICATION_TASK_TYPE = 
"files-verification-task";
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationFilesVerificationTask.class);
+    private final Vertx vertx;
+    private final String id;
+    private final String source;
+    private final int port;
+    // Instance metadata of current instance
+    private final InstanceMetadata instanceMetadata;
+    private final ExecutorPools executorPools;
+    private final LiveMigrationFilesVerificationRequest request;
+    private final SidecarClient sidecarClient;
+    private final LiveMigrationConfiguration liveMigrationConfiguration;
+    private final DigestVerifierFactory digestVerifierFactory;
+    private final AtomicReference<State> state;
+    private final Promise<Void> completionPromise = Promise.promise();
+    private final AtomicInteger filesNotFoundAtSource = new AtomicInteger(0);
+    private final AtomicInteger filesNotFoundAtDestination = new 
AtomicInteger(0);
+    private final AtomicInteger metadataMatched = new AtomicInteger(0);
+    private final AtomicInteger metadataMismatches = new AtomicInteger(0);
+    private final AtomicInteger digestMismatches = new AtomicInteger(0);
+    private final AtomicInteger digestVerificationFailures = new 
AtomicInteger(0);
+    private final AtomicInteger filesMatched = new AtomicInteger(0);
+    private final String logPrefix;
+    private final AtomicBoolean cancelled = new AtomicBoolean(false);
+    private volatile AsyncConcurrentTaskExecutor<String> 
asyncConcurrentTaskExecutor;
+
+    @VisibleForTesting
+    protected LiveMigrationFilesVerificationTask(Builder builder)
+    {
+        this.vertx = builder.vertx;
+        this.id = builder.id;
+        this.source = builder.source;
+        this.port = builder.port;
+        this.instanceMetadata = builder.instanceMetadata;
+        this.executorPools = builder.executorPools;
+        this.request = builder.request;
+        this.sidecarClient = builder.sidecarClient;
+        this.liveMigrationConfiguration = builder.liveMigrationConfiguration;
+        this.digestVerifierFactory = builder.digestVerifierFactory;
+        this.logPrefix = "fileVerifyTaskId=" + id;
+        this.state = new AtomicReference<>(State.NOT_STARTED);
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public Future<Void> validate()
+    {
+        return abortIfCancelled(new Object())
+               // Get files info from both source and destination
+               .compose(v -> Future.join(listFilesInLocal(), 
fetchSourceFileList()))
+
+               // Compare the files information before calculating digests
+               .compose(this::abortIfCancelled)
+               .compose(cf -> compareFilesMeta(cf.resultAt(0), cf.resultAt(1)))
+
+               // Create digest comparison tasks
+               .compose(this::abortIfCancelled)
+               .compose(this::getDigestComparisonTasks)
+
+               // Compare digests
+               .compose(this::abortIfCancelled)
+               .compose(this::compareDigests)
+
+               .andThen(ar -> {
+                   if (ar.failed())
+                   {
+                       LOGGER.error("{} Files verification task failed during 
validation", logPrefix, ar.cause());
+                       // Fail the task if it is in IN_PROGRESS state.
+                       state.compareAndSet(State.IN_PROGRESS, State.FAILED);
+                   }
+                   else
+                   {
+                       State currentState = 
state.compareAndExchange(State.IN_PROGRESS, State.COMPLETED);
+                       if (currentState == State.CANCELLED)
+                       {
+                           LOGGER.debug("{} Task was cancelled before 
completion", logPrefix);
+                       }
+                       else if (currentState != State.IN_PROGRESS)
+                       {
+                           LOGGER.warn("{} Unexpected state transition 
failure. State was {}", logPrefix, currentState);
+                       }
+                       else
+                       {
+                           LOGGER.info("{} Files verification task completed 
successfully", logPrefix);
+                       }
+                   }
+               });
+    }
+
+    private Future<List<InstanceFileInfo>> listFilesInLocal()
+    {
+        return executorPools.internal()
+                            .executeBlocking(() -> new 
CassandraInstanceFilesImpl(instanceMetadata,
+                                                                               
   liveMigrationConfiguration)
+                                                   .files());
+    }
+
+    private Future<InstanceFilesListResponse> fetchSourceFileList()
+    {
+        return Future.fromCompletionStage(
+                     sidecarClient.liveMigrationListInstanceFilesAsync(new 
SidecarInstanceImpl(source, port)))
+                     .onFailure(cause -> LOGGER.error("{} Failed to obtain 
list of files from source {}:{}",
+                                                      logPrefix, source, port, 
cause));
+    }
+
+    private @NotNull Future<List<InstanceFileInfo>> 
compareFilesMeta(List<InstanceFileInfo> localFiles,
+                                                                     
InstanceFilesListResponse sourceFiles)
+    {
+        Map<String, InstanceFileInfo> filesAtLocal =
+        localFiles.stream()
+                  .collect(Collectors.toMap(fileInfo -> fileInfo.fileUrl, 
fileInfo -> fileInfo));
+
+        Map<String, InstanceFileInfo> filesAtSource =
+        sourceFiles.getFiles().stream()
+                   .collect(Collectors.toMap(fileInfo -> fileInfo.fileUrl, 
fileInfo -> fileInfo));
+
+        for (Map.Entry<String, InstanceFileInfo> localFileEntry : 
filesAtLocal.entrySet())
+        {
+            String fileUrl = localFileEntry.getKey();
+            InstanceFileInfo localFile = localFileEntry.getValue();
+            if (filesAtSource.containsKey(fileUrl))
+            {
+                InstanceFileInfo sourceFile = filesAtSource.get(fileUrl);
+                // compare files metadata
+                if (localFile.equals(sourceFile) ||
+                    (localFile.fileType == DIRECTORY && sourceFile.fileType == 
DIRECTORY))
+                {
+                    // File info matches if either of them are directories or
+                    // their size, last modified time etc... matches
+                    LOGGER.debug("{} {} file metadata matched with source", 
logPrefix, fileUrl);
+                    metadataMatched.incrementAndGet();
+                }
+                else
+                {
+                    // Files metadata did not match
+                    metadataMismatches.incrementAndGet();
+                    LOGGER.error("{} {} did not match with source. Local file 
info={}, source file info={}",
+                                 logPrefix, fileUrl, localFile, sourceFile);
+                }
+
+                // done with processing current local file, remove it
+                filesAtSource.remove(fileUrl);
+            }
+            else
+            {
+                // File is not available at source, report it
+                LOGGER.error("{} File {} exists at destination but not found 
at source", logPrefix, fileUrl);
+                filesNotFoundAtSource.incrementAndGet();
+            }
+        }
+
+        for (Map.Entry<String, InstanceFileInfo> sourceFileEntry : 
filesAtSource.entrySet())
+        {
+            // Remaining files are not present at destination, report error 
for them
+            LOGGER.error("{} File {} exists at source but not found at 
destination", logPrefix, sourceFileEntry.getKey());
+            filesNotFoundAtDestination.incrementAndGet();
+        }
+
+        if (filesNotFoundAtDestination.get() > 0
+            || filesNotFoundAtSource.get() > 0
+            || metadataMismatches.get() > 0)
+        {
+            FileVerificationFailureException exception =
+            new FileVerificationFailureException("Files list did not match 
between source and destination");
+
+            return Future.failedFuture(exception);
+        }
+
+        return Future.succeededFuture(localFiles);
+    }
+
+    private @NotNull Future<List<Future<String>>> 
getDigestComparisonTasks(List<InstanceFileInfo> localFiles)
+    {
+        // now verify digests of each local file with source file
+        List<Callable<Future<String>>> tasks = new 
ArrayList<>(localFiles.size());
+        localFiles.stream()
+                  .filter(fileInfo -> fileInfo.fileType != DIRECTORY)
+                  .forEach(fileInfo -> tasks.add(() -> 
verifyDigest(fileInfo)));
+        asyncConcurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, 
tasks, request.maxConcurrency());
+        List<Future<String>> verifyTaskResults = 
asyncConcurrentTaskExecutor.start();
+        return Future.join(verifyTaskResults)
+                     .transform(ar -> 
Future.succeededFuture(verifyTaskResults));
+    }
+
+    private Future<Void> compareDigests(List<Future<String>> futures)
+    {
+        boolean verificationFailed = false;
+        for (Future<String> future : futures)
+        {
+            if (future.failed())
+            {
+                verificationFailed = true;
+                Throwable cause = future.cause();
+                if (cause instanceof DigestMismatchException)
+                {
+                    DigestMismatchException ex = (DigestMismatchException) 
cause;
+                    LOGGER.error("{} File digests did not match for {}.", 
logPrefix, ex.fileUrl(), cause);
+                    digestMismatches.incrementAndGet();
+                }
+                else
+                {
+                    LOGGER.error("{} Failed to verify digest for file. Check 
previous logs for file details",
+                                 logPrefix, cause);
+                    digestVerificationFailures.incrementAndGet();
+                }
+                continue;
+            }
+
+            // Digest verification succeeded
+            filesMatched.incrementAndGet();
+        }
+
+        if (verificationFailed)
+        {
+            return Future.failedFuture(
+            new FileVerificationFailureException("File digests did not match 
between source and destination"));
+        }
+        else
+        {
+            return Future.succeededFuture();
+        }
+    }
+
+    @VisibleForTesting
+    Future<String> verifyDigest(InstanceFileInfo fileInfo)
+    {
+        return getSourceFileDigest(fileInfo)
+               .compose(digest -> {
+                   String path = localPath(fileInfo.fileUrl, 
instanceMetadata).toAbsolutePath().toString();
+                   return 
digestVerifierFactory.verifier(MultiMap.caseInsensitiveMultiMap().addAll(digest.headers()))
+                                               .verify(path)
+                                               .compose(verified -> 
Future.succeededFuture(path))
+                                               .recover(cause -> 
Future.failedFuture(
+                                               new 
DigestMismatchException(path, fileInfo.fileUrl, cause)));
+               })
+               .onSuccess(filePath -> LOGGER.debug("{} Verified file {}", 
logPrefix, fileInfo.fileUrl))
+               .onFailure(cause -> LOGGER.error("{} Failed to verify file {}", 
logPrefix, fileInfo.fileUrl, cause));
+    }
+
+    private Future<Digest> getSourceFileDigest(InstanceFileInfo fileInfo)
+    {
+        return 
Future.fromCompletionStage(sidecarClient.liveMigrationFileDigestAsync(new 
SidecarInstanceImpl(source, port),
+                                                                               
      fileInfo.fileUrl,
+                                                                               
      request.digestAlgorithm()))
+                     .compose(this::toDigest);
+    }

Review Comment:
   After giving some more thought, I feel the 429 status code 
(TOO_MANY_REQUESTS) is not appropriate and the 503 (SERVICE UNAVAILABLE) is 
more appropriate as maxConcurrentFileRequests (used by 
`LiveMigrationConcurrencyLimitHandler`) is a server-side concurrency cap shared 
across all clients, and an individual client hitting this limit hasn't done 
anything wrong - the server is simply busy with other requests. 503 describes 
this situation. 
   
   SidecarclientProvider is using `ExponentialBackoffRetryPolicy` as the 
default retry policy, which does retries. If we suspect that the default retry 
policy can be overridden, then I can explicitly initiate an instance of 
ExponentialBackoffRetryPolicy and use it.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to