frankgh commented on code in PR #252:
URL: https://github.com/apache/cassandra-sidecar/pull/252#discussion_r2292176436


##########
client-common/src/test/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequestTest.java:
##########


Review Comment:
   Indentation in this file is a bit off. I think there's an [effort to 
standardize](https://github.com/apache/cassandra-sidecar/pull/235) on a style 
and make it automatic, but I think for now we should try to stay close to the 
conventions we have in the project



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationDirType.java:
##########
@@ -28,7 +28,7 @@ public enum LiveMigrationDirType
 {
     CDC_RAW_DIR("cdc_raw"),
     COMMIT_LOG_DIR("commitlog"),
-    DATA_FIlE_DIR("data"),
+    DATA_FILE_DIR("data"),

Review Comment:
   wow, it took me 10 seconds to realize the diff



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()
+    {
+        return new OperationStatus(this.state.toPreparing(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the DOWNLOADING state with updated download metadata.
+     * Resets progress counters for the new download phase.
+     *
+     * @param bytesToDownload total size of data to be downloaded in bytes
+     * @param filesToDownload number of files to be downloaded
+     * @return new OperationStatus instance in DOWNLOADING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOADING
+     */
+    OperationStatus getDownloadingState(final long bytesToDownload, final int 
filesToDownload)
+    {
+        return new OperationStatus(this.state.toDownloading(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   bytesToDownload,
+                                   filesToDownload,
+                                   new AtomicInteger(),
+                                   new AtomicInteger(),
+                                   new AtomicLong());
+    }
+
+    /**
+     * Transitions to the DOWNLOAD_COMPLETE state.
+     *
+     * @return new OperationStatus instance in DOWNLOAD_COMPLETE state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOAD_COMPLETE
+     */
+    OperationStatus getDownloadCompleteState()
+    {
+        return new OperationStatus(this.state.toDownloadComplete(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the SUCCESS state, indicating successful completion.
+     *
+     * @return new OperationStatus instance in SUCCESS state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to SUCCESS
+     */
+    @VisibleForTesting
+    public OperationStatus getSuccessState()
+    {
+        return new OperationStatus(this.state.toSuccess(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the FAILED state, indicating operation failure.
+     *
+     * @return new OperationStatus instance in FAILED state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to FAILED
+     */
+    OperationStatus tryFailureState()
+    {
+        return new OperationStatus(this.state.toFailed(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Cancels the task if not completed.
+     *
+     * @return Returns same state if completed, otherwise returns cancelled 
state.
+     */
+    public OperationStatus cancel()
+    {
+        return new OperationStatus(this.state.toCancelled(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    public State getState()
+    {
+        return state;
+    }
+
+    public State state()
+    {
+        return state;
+    }
+
+    public long totalSize()
+    {
+        return totalSize;
+    }
+
+    public long bytesToDownload()
+    {
+        return bytesToDownload;
+    }
+
+    public int filesToDownload()
+    {
+        return filesToDownload;
+    }
+
+    /**
+     * Returns the thread-safe atomic counter for files downloaded.
+     * This counter can be safely read and updated from multiple threads.
+     *
+     * @return atomic counter for files downloaded
+     */
+    public AtomicInteger filesDownloaded()
+    {
+        return filesDownloaded;
+    }
+
+    /**
+     * Returns the thread-safe atomic counter for bytes downloaded.
+     * This counter can be safely read and updated from multiple threads.
+     *
+     * @return atomic counter for bytes downloaded
+     */
+    public AtomicLong bytesDownloaded()
+    {
+        return bytesDownloaded;
+    }
+
+    public int totalFiles()
+    {
+        return totalFiles;
+    }
+
+    /**
+     * Returns the thread-safe atomic counter for download failures.
+     * This counter can be safely read and updated from multiple threads.
+     *
+     * @return atomic counter for download failures
+     */
+    public AtomicInteger downloadFailures()
+    {
+        return downloadFailures;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "OperationStatus{" +
+               "bytesDownloaded=" + bytesDownloaded +
+               ", filesDownloaded=" + filesDownloaded +
+               ", filesToDownload=" + filesToDownload +
+               ", totalSize=" + totalSize +
+               ", downloadSize=" + bytesToDownload +
+               ", state=" + state +
+               '}';
+    }
+
+    /**
+     * Represents the various states of a live migration data copy operation.
+     *
+     * <h3>State Descriptions:</h3>
+     * <ul>
+     *   <li><b>STARTING</b> - Initial state when the operation begins</li>
+     *   <li><b>CLEANING</b> - Removing unnecessary files from the 
destination</li>
+     *   <li><b>PREPARING</b> - Analyzing files to determine what needs to be 
downloaded</li>
+     *   <li><b>DOWNLOADING</b> - Actively downloading files from the 
source</li>
+     *   <li><b>DOWNLOAD_COMPLETE</b> - All files have been downloaded 
(terminal state)</li>
+     *   <li><b>SUCCESS</b> - Operation completed successfully (terminal 
state)</li>
+     *   <li><b>FAILED</b> - Operation failed due to an error (terminal 
state)</li>
+     *   <li><b>CANCELLED</b> - Operation was cancelled by user request 
(terminal state)</li>
+     * </ul>
+     * <p>
+     * Invalid transitions throw {@link IllegalStateTransitionException}.
+     */
+    public enum State

Review Comment:
   I think it would be valuable to have unit tests for this specific enum that 
focus on testing the transitions



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)

Review Comment:
   should this be renamed to indicate that a transition is occurring? for 
example `toCleaningState` ?



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.utils.SidecarClientProvider;
+
+/**
+ * Factory implementation which produces {@link LiveMigrationTask} instances.
+ */
+@Singleton
+public class LiveMigrationTaskFactoryImpl implements LiveMigrationTaskFactory
+{
+
+    private final Vertx vertx;
+    private final SidecarClientProvider sidecarClientProvider;
+    private final LiveMigrationConfiguration liveMigrationConfiguration;
+    private final ExecutorPools executorPools;
+
+    @Inject
+    public LiveMigrationTaskFactoryImpl(Vertx vertx,
+                                        ExecutorPools executorPools,
+                                        SidecarClientProvider 
sidecarClientProvider,
+                                        SidecarConfiguration 
sidecarConfiguration)
+    {
+        this.vertx = vertx;
+        this.executorPools = executorPools;
+        this.sidecarClientProvider = sidecarClientProvider;
+        this.liveMigrationConfiguration = 
sidecarConfiguration.liveMigrationConfiguration();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public LiveMigrationTask create(final LiveMigrationDataCopyRequest request,
+                                    final String source,
+                                    final int port,
+                                    final InstanceMetadata instanceMetadata)

Review Comment:
   NIT
   ```suggestion
       public LiveMigrationTask create(LiveMigrationDataCopyRequest request,
                                       String source,
                                       int port,
                                       InstanceMetadata instanceMetadata)
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()

Review Comment:
   ```suggestion
       public OperationStatus toPreparingState()
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutor.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.concurrent;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A task executor that manages concurrent execution of asynchronous tasks 
with configurable concurrency limit.
+ *
+ * <h2>Overview</h2>
+ * <p>This executor ensures that only a specified number of tasks (defined by 
{@code maxConcurrency})
+ * execute simultaneously. When a task completes, the next pending task is 
automatically
+ * triggered, maintaining optimal resource utilization while respecting 
concurrency constraints.</p>
+ *
+ * <h2>Architecture</h2>
+ * <p>The executor uses a promise-based triggering mechanism:</p>
+ * <ul>
+ *   <li>Each task is associated with a {@link Promise} that acts as a trigger 
signal</li>
+ *   <li>Tasks only start execution when their corresponding Promise completes 
successfully</li>
+ *   <li>Initially, {@code maxConcurrency} number of promises are completed to 
start the execution flow</li>
+ *   <li>As tasks complete, the next pending task is automatically 
triggered</li>
+ * </ul>
+ *
+ * <h2>Concurrency Control</h2>
+ * <p>The executor maintains a global task index using {@link AtomicInteger} 
to ensure thread-safe
+ * task scheduling. Tasks are processed in the order they were submitted, with 
automatic
+ * flow control to maintain the desired concurrency level.</p>
+ *
+ * <h2>Error Handling</h2>
+ * <p>When a task completes (successfully or with a non-{@link 
CancellationException} error),
+ * the executor automatically triggers the next pending task. Tasks that are 
cancelled do not
+ * trigger subsequent tasks to prevent further executions.</p>
+ *
+ * <h2>Usage Example</h2>
+ * <pre>{@code
+ * List<Callable<Future<String>>> tasks = Arrays.asList(
+ *     () -> downloadFile("file1.txt"),
+ *     () -> downloadFile("file2.txt"),
+ *     () -> downloadFile("file3.txt")
+ * );
+ *
+ * AsyncConcurrentTaskExecutor<String> executor =
+ *     new AsyncConcurrentTaskExecutor<>(vertx, tasks, 2); // max 2 tasks can 
execute concurrently
+ *
+ * List<Future<String>> results = executor.start();
+ * }</pre>
+ *
+ * @param <T> the type of result produced by the tasks
+ */
+public class AsyncConcurrentTaskExecutor<T>
+{
+    protected static final String TASK_CANCEL_MESSAGE = "task cancelled";
+    private final AtomicInteger index = new AtomicInteger(0);
+    private final int maxConcurrency;
+    private final Vertx vertx;
+    private final List<Promise<T>> taskPromises;
+    private final List<Callable<Future<T>>> tasks;
+
+    /**
+     * Creates a new AsyncConcurrentTaskExecutor.
+     *
+     * @param vertx          the Vert.x instance for context execution
+     * @param tasks          the list of tasks to execute (must not be empty)
+     * @param maxConcurrency the maximum number of tasks to execute 
concurrently (must be &gt; 0)
+     * @throws IllegalArgumentException if maxConcurrency &lt;= 0 or tasks is 
empty
+     */
+    public AsyncConcurrentTaskExecutor(@NotNull Vertx vertx,
+                                       @NotNull List<Callable<Future<T>>> 
tasks,
+                                       int maxConcurrency)
+    {
+        Objects.requireNonNull(vertx, "vertx must not be null.");
+        Objects.requireNonNull(tasks, "tasks must not be null.");
+        if (maxConcurrency < 1)
+        {
+            throw (new IllegalArgumentException("maxConcurrency must be > 0"));
+        }
+
+        this.tasks = Collections.unmodifiableList(tasks);
+        this.maxConcurrency = maxConcurrency;
+        this.vertx = vertx;
+        taskPromises = tasks.stream()
+                            .map(item -> Promise.<T>promise())
+                            .collect(Collectors.toUnmodifiableList());
+    }
+
+    /**
+     * Returns the list of task promises for testing purposes.
+     *
+     * @return unmodifiable list of task promises
+     */
+    @VisibleForTesting
+    List<Promise<T>> getTaskPromises()
+    {
+        return taskPromises;
+    }
+
+    /**
+     * Starts the execution of tasks with the configured concurrency limit.
+     *
+     * <p>This method immediately starts up to {@code maxConcurrency} tasks 
and returns
+     * futures for all tasks. As running tasks complete, pending tasks are 
automatically
+     * triggered to maintain the concurrency level.</p>
+     *
+     * <h3>Execution Flow Diagram:</h3>
+     * <pre>
+     *            index
+     *             +
+     *             |
+     *             v
+     * +----+----+-+--+----+------------+----+
+     * | p0 | p1 | p2 | p3 |    ...     |pn-1|
+     * +-+--+-+--+-+--+-+--+------------+-+--+
+     *   |    |    |    |                 |
+     *   |    |    +    +                 +
+     *   |    |
+     *   |    |    +    +                 +
+     *   |    |    |    |                 |
+     *   +    +    +    +                 +
+     *   t0   t1   t2   t3                tn-1
+     *  (r)  (r)
+     * (r) = running
+     * </pre>
+     *
+     * @return list of futures representing all tasks (same order as input)
+     */
+    public List<Future<T>> start()
+    {
+        List<Future<T>> taskFutures = new ArrayList<>(tasks.size());
+        for (int i = 0; i < tasks.size(); i++)
+        {
+            Promise<T> p = taskPromises.get(i);
+            taskFutures.add(waitAndDo(p.future(), tasks.get(i)));
+        }
+
+        // Start maxConcurrency no.of tasks
+        for (int i = 0; i < maxConcurrency; i++)
+        {
+            triggerNextTask();
+        }
+        return taskFutures;
+    }
+
+    /**
+     * Associates a task with its trigger promise and handles task execution 
and completion.
+     *
+     * <p>This method creates a future chain where the task only executes when 
the trigger
+     * future completes. Upon task completion, it automatically triggers the 
next pending task.</p>
+     *
+     * @param future the trigger future that must complete before task 
execution
+     * @param task   the task to execute when triggered
+     * @return future representing the task's execution result
+     */
+    private Future<T> waitAndDo(Future<T> future, Callable<Future<T>> task)
+    {
+        return future.compose(ar -> {
+                         try
+                         {
+                             return task.call();
+                         }
+                         catch (Exception e)
+                         {
+                             return Future.failedFuture(e);
+                         }
+                     })
+                     .onComplete(ar -> {
+                         if (ar.succeeded() || !(ar.cause() instanceof 
CancellationException))

Review Comment:
   does it make sense to log here if the task did not succeed?



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()
+    {
+        return new OperationStatus(this.state.toPreparing(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the DOWNLOADING state with updated download metadata.
+     * Resets progress counters for the new download phase.
+     *
+     * @param bytesToDownload total size of data to be downloaded in bytes
+     * @param filesToDownload number of files to be downloaded
+     * @return new OperationStatus instance in DOWNLOADING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOADING
+     */
+    OperationStatus getDownloadingState(final long bytesToDownload, final int 
filesToDownload)
+    {
+        return new OperationStatus(this.state.toDownloading(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   bytesToDownload,
+                                   filesToDownload,
+                                   new AtomicInteger(),
+                                   new AtomicInteger(),
+                                   new AtomicLong());
+    }
+
+    /**
+     * Transitions to the DOWNLOAD_COMPLETE state.
+     *
+     * @return new OperationStatus instance in DOWNLOAD_COMPLETE state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOAD_COMPLETE
+     */
+    OperationStatus getDownloadCompleteState()
+    {
+        return new OperationStatus(this.state.toDownloadComplete(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the SUCCESS state, indicating successful completion.
+     *
+     * @return new OperationStatus instance in SUCCESS state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to SUCCESS
+     */
+    @VisibleForTesting
+    public OperationStatus getSuccessState()

Review Comment:
   ```suggestion
       public OperationStatus toSuccessState()
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()
+    {
+        return new OperationStatus(this.state.toPreparing(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the DOWNLOADING state with updated download metadata.
+     * Resets progress counters for the new download phase.
+     *
+     * @param bytesToDownload total size of data to be downloaded in bytes
+     * @param filesToDownload number of files to be downloaded
+     * @return new OperationStatus instance in DOWNLOADING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOADING
+     */
+    OperationStatus getDownloadingState(final long bytesToDownload, final int 
filesToDownload)
+    {
+        return new OperationStatus(this.state.toDownloading(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   bytesToDownload,
+                                   filesToDownload,
+                                   new AtomicInteger(),
+                                   new AtomicInteger(),
+                                   new AtomicLong());
+    }
+
+    /**
+     * Transitions to the DOWNLOAD_COMPLETE state.
+     *
+     * @return new OperationStatus instance in DOWNLOAD_COMPLETE state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOAD_COMPLETE
+     */
+    OperationStatus getDownloadCompleteState()

Review Comment:
   ```suggestion
       OperationStatus toDownloadCompleteState()
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()
+    {
+        return new OperationStatus(this.state.toPreparing(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the DOWNLOADING state with updated download metadata.
+     * Resets progress counters for the new download phase.
+     *
+     * @param bytesToDownload total size of data to be downloaded in bytes
+     * @param filesToDownload number of files to be downloaded
+     * @return new OperationStatus instance in DOWNLOADING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOADING
+     */
+    OperationStatus getDownloadingState(final long bytesToDownload, final int 
filesToDownload)

Review Comment:
   ```suggestion
       OperationStatus toDownloadingState(long bytesToDownload, int 
filesToDownload)
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()
+    {
+        return new OperationStatus(this.state.toPreparing(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the DOWNLOADING state with updated download metadata.
+     * Resets progress counters for the new download phase.
+     *
+     * @param bytesToDownload total size of data to be downloaded in bytes
+     * @param filesToDownload number of files to be downloaded
+     * @return new OperationStatus instance in DOWNLOADING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOADING
+     */
+    OperationStatus getDownloadingState(final long bytesToDownload, final int 
filesToDownload)
+    {
+        return new OperationStatus(this.state.toDownloading(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   bytesToDownload,
+                                   filesToDownload,
+                                   new AtomicInteger(),
+                                   new AtomicInteger(),
+                                   new AtomicLong());
+    }
+
+    /**
+     * Transitions to the DOWNLOAD_COMPLETE state.
+     *
+     * @return new OperationStatus instance in DOWNLOAD_COMPLETE state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOAD_COMPLETE
+     */
+    OperationStatus getDownloadCompleteState()
+    {
+        return new OperationStatus(this.state.toDownloadComplete(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the SUCCESS state, indicating successful completion.
+     *
+     * @return new OperationStatus instance in SUCCESS state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to SUCCESS
+     */
+    @VisibleForTesting
+    public OperationStatus getSuccessState()
+    {
+        return new OperationStatus(this.state.toSuccess(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the FAILED state, indicating operation failure.
+     *
+     * @return new OperationStatus instance in FAILED state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to FAILED
+     */
+    OperationStatus tryFailureState()
+    {
+        return new OperationStatus(this.state.toFailed(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Cancels the task if not completed.
+     *
+     * @return Returns same state if completed, otherwise returns cancelled 
state.
+     */
+    public OperationStatus cancel()
+    {
+        return new OperationStatus(this.state.toCancelled(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    public State getState()
+    {
+        return state;
+    }
+
+    public State state()
+    {
+        return state;
+    }
+
+    public long totalSize()
+    {
+        return totalSize;
+    }
+
+    public long bytesToDownload()
+    {
+        return bytesToDownload;
+    }
+
+    public int filesToDownload()
+    {
+        return filesToDownload;
+    }
+
+    /**
+     * Returns the thread-safe atomic counter for files downloaded.
+     * This counter can be safely read and updated from multiple threads.
+     *
+     * @return atomic counter for files downloaded
+     */
+    public AtomicInteger filesDownloaded()
+    {
+        return filesDownloaded;
+    }
+
+    /**
+     * Returns the thread-safe atomic counter for bytes downloaded.
+     * This counter can be safely read and updated from multiple threads.
+     *
+     * @return atomic counter for bytes downloaded
+     */
+    public AtomicLong bytesDownloaded()

Review Comment:
   In general -1 on exposing these values directly and instead providing 
business oriented methods that allow you to internally modify these values



##########
server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java:
##########
@@ -51,11 +58,113 @@ public class LiveMigrationModule extends AbstractModule
     protected void configure()
     {
         
bind(LiveMigrationMap.class).to(LiveMigrationMapSidecarConfigImpl.class);
+        
bind(LiveMigrationTaskFactory.class).to(LiveMigrationTaskFactoryImpl.class);
     }
 
+    @GET
+    @Path(ApiEndpointsV1.LIVE_MIGRATION_DATA_COPY_TASKS_ROUTE)
+    @Operation(summary = "Create data copy task",
+               description = "Creates a new data copy task for live migration")
+    @APIResponse(description = "Data copy task created successfully",
+                 responseCode = "200",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @APIResponse(responseCode = "403",
+                 description = "Live migration not enabled or node not 
configured as destination",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @ProvidesIntoMap
+    
@KeyClassMapKey(VertxRouteMapKeys.LiveMigrationCreateDataCopyTaskRouteKey.class)
+    public VertxRoute createDataCopyTaskRoute(RouteBuilder.Factory factory,
+                                              
LiveMigrationApiEnableDisableHandler liveMigrationApiEnableDisableHandler,
+                                              
LiveMigrationCreateDataCopyTaskHandler liveMigrationCreateDataCopyTaskHandler)
+    {
+        return factory.builderForRoute()
+                      .setBodyHandler(true)
+                      
.handler(liveMigrationApiEnableDisableHandler::isDestination)
+                      .handler(liveMigrationCreateDataCopyTaskHandler)
+                      .build();
+    }
+
+    @PATCH
+    @Operation(summary = "Cancel data copy task",
+    description = "Cancels an existing data copy task for live migration")
+    @APIResponse(description = "Data copy task cancelled successfully",
+                 responseCode = "200",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @APIResponse(responseCode = "403",
+                 description = "Live migration not enabled or node not 
configured as destination",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @APIResponse(responseCode = "404",
+                 description = "Data copy task not found",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @ProvidesIntoMap
+    
@KeyClassMapKey(VertxRouteMapKeys.LiveMigrationCancelDataCopyTaskRouteKey.class)
+    public VertxRoute cancelDataCopyTaskRoute(RouteBuilder.Factory factory,
+                                              
LiveMigrationApiEnableDisableHandler liveMigrationApiEnableDisableHandler,
+                                              
LiveMigrationCancelDataCopyTaskHandler liveMigrationCancelDataCopyTaskHandler)
+    {
+        return factory.builderForRoute()
+                      
.handler(liveMigrationApiEnableDisableHandler::isDestination)
+                      .handler(liveMigrationCancelDataCopyTaskHandler)
+                      .build();
+    }
+
+    @GET
+    @Operation(summary = "Get data copy task",
+               description = "Retrieves the status and details of a specific 
data copy task by task ID")
+    @APIResponse(description = "Data copy task retrieved successfully",
+                 responseCode = "200",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))

Review Comment:
   ```suggestion
                    schema = @Schema(implementation = 
LiveMigrationTaskResponse.class)))
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java:
##########
@@ -68,10 +177,10 @@ protected void configure()
                  schema = @Schema(type = SchemaType.OBJECT)))
     @ProvidesIntoMap
     
@KeyClassMapKey(VertxRouteMapKeys.LiveMigrationFileStreamHandlerRouteKey.class)
-    VertxRoute downloadFileRoute(RouteBuilder.Factory factory,
-                                 LiveMigrationApiEnableDisableHandler 
liveMigrationApiEnableDisableHandler,
-                                 LiveMigrationFileStreamHandler 
liveMigrationFileStreamHandler,
-                                 FileStreamHandler fileStreamHandler)
+    public VertxRoute downloadFileRoute(RouteBuilder.Factory factory,

Review Comment:
   I think these shouldn't be public. Any reason we are changing the visibility 
here?



##########
server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java:
##########
@@ -51,11 +58,113 @@ public class LiveMigrationModule extends AbstractModule
     protected void configure()
     {
         
bind(LiveMigrationMap.class).to(LiveMigrationMapSidecarConfigImpl.class);
+        
bind(LiveMigrationTaskFactory.class).to(LiveMigrationTaskFactoryImpl.class);
     }
 
+    @GET
+    @Path(ApiEndpointsV1.LIVE_MIGRATION_DATA_COPY_TASKS_ROUTE)
+    @Operation(summary = "Create data copy task",
+               description = "Creates a new data copy task for live migration")
+    @APIResponse(description = "Data copy task created successfully",
+                 responseCode = "200",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @APIResponse(responseCode = "403",
+                 description = "Live migration not enabled or node not 
configured as destination",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))
+    @ProvidesIntoMap
+    
@KeyClassMapKey(VertxRouteMapKeys.LiveMigrationCreateDataCopyTaskRouteKey.class)
+    public VertxRoute createDataCopyTaskRoute(RouteBuilder.Factory factory,
+                                              
LiveMigrationApiEnableDisableHandler liveMigrationApiEnableDisableHandler,
+                                              
LiveMigrationCreateDataCopyTaskHandler liveMigrationCreateDataCopyTaskHandler)
+    {
+        return factory.builderForRoute()
+                      .setBodyHandler(true)
+                      
.handler(liveMigrationApiEnableDisableHandler::isDestination)
+                      .handler(liveMigrationCreateDataCopyTaskHandler)
+                      .build();
+    }
+
+    @PATCH
+    @Operation(summary = "Cancel data copy task",
+    description = "Cancels an existing data copy task for live migration")
+    @APIResponse(description = "Data copy task cancelled successfully",
+                 responseCode = "200",
+                 content = @Content(mediaType = "application/json",
+                 schema = @Schema(type = SchemaType.OBJECT)))

Review Comment:
   ```suggestion
                    schema = @Schema(implementation = 
LiveMigrationTaskResponse.class)))
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutor.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.concurrent;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A task executor that manages concurrent execution of asynchronous tasks 
with configurable concurrency limit.
+ *
+ * <h2>Overview</h2>
+ * <p>This executor ensures that only a specified number of tasks (defined by 
{@code maxConcurrency})
+ * execute simultaneously. When a task completes, the next pending task is 
automatically
+ * triggered, maintaining optimal resource utilization while respecting 
concurrency constraints.</p>
+ *
+ * <h2>Architecture</h2>
+ * <p>The executor uses a promise-based triggering mechanism:</p>
+ * <ul>
+ *   <li>Each task is associated with a {@link Promise} that acts as a trigger 
signal</li>
+ *   <li>Tasks only start execution when their corresponding Promise completes 
successfully</li>
+ *   <li>Initially, {@code maxConcurrency} number of promises are completed to 
start the execution flow</li>
+ *   <li>As tasks complete, the next pending task is automatically 
triggered</li>
+ * </ul>
+ *
+ * <h2>Concurrency Control</h2>
+ * <p>The executor maintains a global task index using {@link AtomicInteger} 
to ensure thread-safe
+ * task scheduling. Tasks are processed in the order they were submitted, with 
automatic
+ * flow control to maintain the desired concurrency level.</p>
+ *
+ * <h2>Error Handling</h2>
+ * <p>When a task completes (successfully or with a non-{@link 
CancellationException} error),
+ * the executor automatically triggers the next pending task. Tasks that are 
cancelled do not
+ * trigger subsequent tasks to prevent further executions.</p>
+ *
+ * <h2>Usage Example</h2>
+ * <pre>{@code
+ * List<Callable<Future<String>>> tasks = Arrays.asList(
+ *     () -> downloadFile("file1.txt"),
+ *     () -> downloadFile("file2.txt"),
+ *     () -> downloadFile("file3.txt")
+ * );
+ *
+ * AsyncConcurrentTaskExecutor<String> executor =
+ *     new AsyncConcurrentTaskExecutor<>(vertx, tasks, 2); // max 2 tasks can 
execute concurrently
+ *
+ * List<Future<String>> results = executor.start();
+ * }</pre>
+ *
+ * @param <T> the type of result produced by the tasks
+ */
+public class AsyncConcurrentTaskExecutor<T>
+{
+    protected static final String TASK_CANCEL_MESSAGE = "task cancelled";
+    private final AtomicInteger index = new AtomicInteger(0);
+    private final int maxConcurrency;
+    private final Vertx vertx;
+    private final List<Promise<T>> taskPromises;
+    private final List<Callable<Future<T>>> tasks;
+
+    /**
+     * Creates a new AsyncConcurrentTaskExecutor.
+     *
+     * @param vertx          the Vert.x instance for context execution
+     * @param tasks          the list of tasks to execute (must not be empty)
+     * @param maxConcurrency the maximum number of tasks to execute 
concurrently (must be &gt; 0)
+     * @throws IllegalArgumentException if maxConcurrency &lt;= 0 or tasks is 
empty
+     */
+    public AsyncConcurrentTaskExecutor(@NotNull Vertx vertx,
+                                       @NotNull List<Callable<Future<T>>> 
tasks,
+                                       int maxConcurrency)
+    {
+        Objects.requireNonNull(vertx, "vertx must not be null.");
+        Objects.requireNonNull(tasks, "tasks must not be null.");
+        if (maxConcurrency < 1)
+        {
+            throw (new IllegalArgumentException("maxConcurrency must be > 0"));
+        }
+
+        this.tasks = Collections.unmodifiableList(tasks);

Review Comment:
   NIT: prefer newer JDK APIs
   ```suggestion
           this.tasks = List.copyOf(tasks);
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java:
##########
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Represents the state and progress of a live migration data copy task using 
a state machine pattern.
+ *
+ * <h2>State Machine</h2>
+ * The operation can follow these state transitions:
+ * <pre>
+ * STARTING -&gt; CLEANING -&gt; PREPARING -&gt; DOWNLOADING -&gt; 
DOWNLOAD_COMPLETE
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            |
+ *    |          |           |            +-&gt; CANCELLED / FAILED
+ *    |          |           +-&gt; SUCCESS
+ *    |          |
+ *    |          +-&gt; CANCELLED / FAILED
+ *    +-&gt; CANCELLED / FAILED
+ *
+ * Note: CANCELLED and FAILED states can be reached from any non-terminal 
state.
+ *       DOWNLOAD_COMPLETE is a terminal state (no further transitions).
+ *       SUCCESS can only be reached from PREPARING state.
+ *       Special case: CANCELLED -&gt; FAILED transition is tolerated (returns 
CANCELLED).
+ *       This handles scenarios where failure occurs after cancellation.
+ * </pre>
+ *
+ * <h2>Thread Safety</h2>
+ * This class is designed for concurrent access:
+ * <ul>
+ *   <li>The {@code State} and size fields are immutable once set</li>
+ *   <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, 
{@code bytesDownloaded})
+ *       are thread-safe using {@code AtomicInteger} and {@code 
AtomicLong}</li>
+ *   <li>State transitions create new immutable instances rather than 
modifying existing ones</li>
+ *   <li>Multiple threads can safely read progress and update counters 
simultaneously</li>
+ * </ul>
+ *
+ * <h2>Usage</h2>
+ * State transitions are managed through factory methods that return new 
instances.
+ * Progress tracking is handled via atomic fields that can be safely updated 
from multiple threads.
+ */
+public class OperationStatus
+{
+    // State of the operation
+    private final State state;
+
+    // Total size of data available at source (immutable once set)
+    private final long totalSize;
+
+    // Total number of files available at source (immutable once set)
+    private final int totalFiles;
+
+    // Size of data to be copied (immutable once set)
+    private final long bytesToDownload;
+
+    // Number of files to download from source (immutable once set)
+    private final int filesToDownload;
+
+    // Number of files downloaded from source (thread-safe atomic counter)
+    private final AtomicInteger filesDownloaded;
+
+    // Number of download failures (thread-safe atomic counter)
+    private final AtomicInteger downloadFailures;
+
+    // Size of data downloaded from source in bytes (thread-safe atomic 
counter)
+    private final AtomicLong bytesDownloaded;
+
+    private OperationStatus(@NotNull State state,
+                            long totalSize,
+                            int totalFiles,
+                            long bytesToDownload,
+                            int filesToDownload,
+                            @NotNull AtomicInteger filesDownloaded,
+                            @NotNull AtomicInteger downloadFailures,
+                            @NotNull AtomicLong bytesDownloaded)
+    {
+        this.state = state;
+        this.totalSize = totalSize;
+        this.totalFiles = totalFiles;
+        this.bytesToDownload = bytesToDownload;
+        this.filesToDownload = filesToDownload;
+        this.filesDownloaded = filesDownloaded;
+        this.downloadFailures = downloadFailures;
+        this.bytesDownloaded = bytesDownloaded;
+    }
+
+    public static OperationStatus getStartingState()
+    {
+        return new OperationStatus(State.STARTING,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   -1,
+                                   new AtomicInteger(0),
+                                   new AtomicInteger(0),
+                                   new AtomicLong(0));
+    }
+
+    /**
+     * Transitions to the CLEANING state with updated file metadata.
+     *
+     * @param totalSize  total size of files at the source
+     * @param totalFiles total number of files at the source
+     * @return new OperationStatus instance in CLEANING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to CLEANING
+     */
+    @VisibleForTesting
+    public OperationStatus getCleaningState(long totalSize, int totalFiles)
+    {
+        return new OperationStatus(this.state.toCleaning(),
+                                   totalSize,
+                                   totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the PREPARING state.
+     *
+     * @return new OperationStatus instance in PREPARING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to PREPARING
+     */
+    @VisibleForTesting
+    public OperationStatus getPreparingState()
+    {
+        return new OperationStatus(this.state.toPreparing(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the DOWNLOADING state with updated download metadata.
+     * Resets progress counters for the new download phase.
+     *
+     * @param bytesToDownload total size of data to be downloaded in bytes
+     * @param filesToDownload number of files to be downloaded
+     * @return new OperationStatus instance in DOWNLOADING state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOADING
+     */
+    OperationStatus getDownloadingState(final long bytesToDownload, final int 
filesToDownload)
+    {
+        return new OperationStatus(this.state.toDownloading(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   bytesToDownload,
+                                   filesToDownload,
+                                   new AtomicInteger(),
+                                   new AtomicInteger(),
+                                   new AtomicLong());
+    }
+
+    /**
+     * Transitions to the DOWNLOAD_COMPLETE state.
+     *
+     * @return new OperationStatus instance in DOWNLOAD_COMPLETE state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to DOWNLOAD_COMPLETE
+     */
+    OperationStatus getDownloadCompleteState()
+    {
+        return new OperationStatus(this.state.toDownloadComplete(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the SUCCESS state, indicating successful completion.
+     *
+     * @return new OperationStatus instance in SUCCESS state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to SUCCESS
+     */
+    @VisibleForTesting
+    public OperationStatus getSuccessState()
+    {
+        return new OperationStatus(this.state.toSuccess(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Transitions to the FAILED state, indicating operation failure.
+     *
+     * @return new OperationStatus instance in FAILED state
+     * @throws IllegalStateTransitionException if current state cannot 
transition to FAILED
+     */
+    OperationStatus tryFailureState()
+    {
+        return new OperationStatus(this.state.toFailed(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    /**
+     * Cancels the task if not completed.
+     *
+     * @return Returns same state if completed, otherwise returns cancelled 
state.
+     */
+    public OperationStatus cancel()
+    {
+        return new OperationStatus(this.state.toCancelled(),
+                                   this.totalSize,
+                                   this.totalFiles,
+                                   this.bytesToDownload,
+                                   this.filesToDownload,
+                                   this.filesDownloaded,
+                                   this.downloadFailures,
+                                   this.bytesDownloaded);
+    }
+
+    public State getState()
+    {
+        return state;
+    }
+
+    public State state()
+    {
+        return state;
+    }
+
+    public long totalSize()
+    {
+        return totalSize;
+    }
+
+    public long bytesToDownload()
+    {
+        return bytesToDownload;
+    }
+
+    public int filesToDownload()
+    {
+        return filesToDownload;
+    }
+
+    /**
+     * Returns the thread-safe atomic counter for files downloaded.
+     * This counter can be safely read and updated from multiple threads.
+     *
+     * @return atomic counter for files downloaded
+     */
+    public AtomicInteger filesDownloaded()

Review Comment:
   I don't think we should expose the internal atomic integer values here, but 
instead expose methods that will internally update the counts. i.e. 
`markFileDownloaded()`



##########
server/src/test/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutorTest.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.concurrent;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+
+import static 
org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor.TASK_CANCEL_MESSAGE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(VertxExtension.class)
+class AsyncConcurrentTaskExecutorTest
+{
+    private final Vertx vertx = Vertx.vertx();
+
+    @Test
+    public void testTaskFailure(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        String failureMessage = "Task failed";
+        tasks.add(getDummyFailedTask(failureMessage));
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+            assertThat(result.getMessage()).isEqualTo(failureMessage);
+            assertThat(taskFutures.get(0).failed()).isTrue();
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    void testZeroTasks(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            assertThat(ar.result().list().size()).isEqualTo(0);
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testMultipleTaskFailure(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        for (int i = 0; i < 10; i++)
+        {
+            String failureMessage = "Task " + i + " failed.";
+            tasks.add(getDummyFailedTask(failureMessage));
+        }
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 5);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+            taskFutures.forEach(ar -> assertThat(ar.failed()).isTrue());
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    public void testSingleTaskSuccess(VertxTestContext context)
+    {
+        List<Callable<Future<String>>> tasks = 
Collections.singletonList(getDummySuccessTask("Task executed successfully"));
+        AsyncConcurrentTaskExecutor<String> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<String>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            assertThat(ar.result().list().get(0)).isEqualTo("Task executed 
successfully");
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testMultipleTasksSucceeds(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = getDummySuccessTasks(10, 
Boolean.TRUE);
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 5);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();

Review Comment:
   ```suggestion
           Future.join(taskFutures).onComplete(context.succeeding(result -> 
context.verify(() -> {
               assertThat(result).isNotNull();
   ```



##########
server/src/test/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutorTest.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.concurrent;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+
+import static 
org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor.TASK_CANCEL_MESSAGE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(VertxExtension.class)
+class AsyncConcurrentTaskExecutorTest
+{
+    private final Vertx vertx = Vertx.vertx();
+
+    @Test
+    public void testTaskFailure(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        String failureMessage = "Task failed";
+        tasks.add(getDummyFailedTask(failureMessage));
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+            assertThat(result.getMessage()).isEqualTo(failureMessage);
+            assertThat(taskFutures.get(0).failed()).isTrue();
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    void testZeroTasks(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            assertThat(ar.result().list().size()).isEqualTo(0);
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testMultipleTaskFailure(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        for (int i = 0; i < 10; i++)
+        {
+            String failureMessage = "Task " + i + " failed.";
+            tasks.add(getDummyFailedTask(failureMessage));
+        }
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 5);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+            taskFutures.forEach(ar -> assertThat(ar.failed()).isTrue());
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    public void testSingleTaskSuccess(VertxTestContext context)
+    {
+        List<Callable<Future<String>>> tasks = 
Collections.singletonList(getDummySuccessTask("Task executed successfully"));
+        AsyncConcurrentTaskExecutor<String> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<String>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            assertThat(ar.result().list().get(0)).isEqualTo("Task executed 
successfully");

Review Comment:
   I think we need to wrap the future in context.succeeding, otherwise errors 
will be swallowed 
   ```suggestion
           Future.join(taskFutures).onComplete(context.succeeding(result -> 
context.verify(() -> {
               assertThat(result).isNotNull();
               assertThat(result.list().get(0)).isEqualTo("Task executed 
successfully");
   ```



##########
server/src/test/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutorTest.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.concurrent;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+
+import static 
org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor.TASK_CANCEL_MESSAGE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(VertxExtension.class)
+class AsyncConcurrentTaskExecutorTest
+{
+    private final Vertx vertx = Vertx.vertx();
+
+    @Test
+    public void testTaskFailure(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        String failureMessage = "Task failed";
+        tasks.add(getDummyFailedTask(failureMessage));
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+            assertThat(result.getMessage()).isEqualTo(failureMessage);
+            assertThat(taskFutures.get(0).failed()).isTrue();
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    void testZeroTasks(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            assertThat(ar.result().list().size()).isEqualTo(0);
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testMultipleTaskFailure(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        for (int i = 0; i < 10; i++)
+        {
+            String failureMessage = "Task " + i + " failed.";
+            tasks.add(getDummyFailedTask(failureMessage));
+        }
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 5);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+            taskFutures.forEach(ar -> assertThat(ar.failed()).isTrue());
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    public void testSingleTaskSuccess(VertxTestContext context)
+    {
+        List<Callable<Future<String>>> tasks = 
Collections.singletonList(getDummySuccessTask("Task executed successfully"));
+        AsyncConcurrentTaskExecutor<String> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<String>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            assertThat(ar.result().list().get(0)).isEqualTo("Task executed 
successfully");
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testMultipleTasksSucceeds(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = getDummySuccessTasks(10, 
Boolean.TRUE);
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 5);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            for (Future<Boolean> taskFuture : taskFutures)
+            {
+                assertThat(taskFuture.succeeded()).isTrue();
+                assertThat(taskFuture.result()).isTrue();
+            }
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testLotsOfTasks(VertxTestContext context)
+    {
+        List<Callable<Future<Integer>>> tasks = getDummySuccessTasks(100_000, 
1);
+        AsyncConcurrentTaskExecutor<Integer> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 50);
+        List<Future<Integer>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();

Review Comment:
   ```suggestion
           Future.join(taskFutures).onComplete(context.succeeding(result -> 
context.verify(() -> {
               assertThat(result).isNotNull();
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers.livemigration;
+
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.json.JsonObject;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationDataCopyInProgressException;
+import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException;
+import org.apache.cassandra.sidecar.handlers.AbstractHandler;
+import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.livemigration.DataCopyTaskManager;
+import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for creating data copy tasks for live migration between Cassandra 
instances.
+ * <p>
+ * This handler processes requests to initiate data copying from a source 
Cassandra instance to a destination instance.
+ * Data copy tasks must be submitted to the destination Sidecar instance. When 
a task is accepted,
+ * this handler responds with HTTP 202 ACCEPTED status and returns a JSON 
response containing:
+ * - taskId: Unique identifier for tracking the created task
+ * - statusUrl: URL that can be used to query the status of the data copy 
operation
+ */
+public class LiveMigrationCreateDataCopyTaskHandler extends 
AbstractHandler<LiveMigrationDataCopyRequest> implements AccessProtected
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LiveMigrationCreateDataCopyTaskHandler.class);
+    private final DataCopyTaskManager dataCopyTaskManager;
+
+    @Inject
+    public LiveMigrationCreateDataCopyTaskHandler(InstanceMetadataFetcher 
metadataFetcher,
+                                                  ExecutorPools executorPools,
+                                                  CassandraInputValidator 
validator,
+                                                  DataCopyTaskManager 
dataCopyTaskManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.dataCopyTaskManager = dataCopyTaskManager;
+    }
+
+    @Override
+    protected LiveMigrationDataCopyRequest extractParamsOrThrow(RoutingContext 
context)
+    {
+        try
+        {
+            return Json.decodeValue(context.body().buffer(), 
LiveMigrationDataCopyRequest.class);
+        }
+        catch (DecodeException decodeException)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                                    "Error while decoding values, please check 
your request body.",
+                                    decodeException);
+        }
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  @NotNull String host,
+                                  SocketAddress remoteAddress,
+                                  LiveMigrationDataCopyRequest 
liveMigrationTaskRequest)
+    {
+        dataCopyTaskManager

Review Comment:
   I see create task has some sort of concurrency limiter control. There is 
something similar implemented in 
`org.apache.cassandra.sidecar.handlers.sstableuploads.SSTableUploadHandler` 
leveraging the `org.apache.cassandra.sidecar.concurrent.ConcurrencyLimiter`



##########
server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java:
##########
@@ -51,11 +58,113 @@ public class LiveMigrationModule extends AbstractModule
     protected void configure()
     {
         
bind(LiveMigrationMap.class).to(LiveMigrationMapSidecarConfigImpl.class);
+        
bind(LiveMigrationTaskFactory.class).to(LiveMigrationTaskFactoryImpl.class);
     }
 
+    @GET
+    @Path(ApiEndpointsV1.LIVE_MIGRATION_DATA_COPY_TASKS_ROUTE)
+    @Operation(summary = "Create data copy task",
+               description = "Creates a new data copy task for live migration")
+    @APIResponse(description = "Data copy task created successfully",
+                 responseCode = "200",

Review Comment:
   I think it is returning an ACCEPTED status code, which I believe is 202, no?



##########
server/src/test/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutorTest.java:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.concurrent;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+
+import static 
org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor.TASK_CANCEL_MESSAGE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(VertxExtension.class)
+class AsyncConcurrentTaskExecutorTest
+{
+    private final Vertx vertx = Vertx.vertx();
+
+    @Test
+    public void testTaskFailure(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        String failureMessage = "Task failed";
+        tasks.add(getDummyFailedTask(failureMessage));
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+            assertThat(result.getMessage()).isEqualTo(failureMessage);
+            assertThat(taskFutures.get(0).failed()).isTrue();
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    void testZeroTasks(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            assertThat(ar.result().list().size()).isEqualTo(0);
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testMultipleTaskFailure(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = new ArrayList<>();
+        for (int i = 0; i < 10; i++)
+        {
+            String failureMessage = "Task " + i + " failed.";
+            tasks.add(getDummyFailedTask(failureMessage));
+        }
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 5);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+            taskFutures.forEach(ar -> assertThat(ar.failed()).isTrue());
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    public void testSingleTaskSuccess(VertxTestContext context)
+    {
+        List<Callable<Future<String>>> tasks = 
Collections.singletonList(getDummySuccessTask("Task executed successfully"));
+        AsyncConcurrentTaskExecutor<String> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<String>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            assertThat(ar.result().list().get(0)).isEqualTo("Task executed 
successfully");
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testMultipleTasksSucceeds(VertxTestContext context)
+    {
+        List<Callable<Future<Boolean>>> tasks = getDummySuccessTasks(10, 
Boolean.TRUE);
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 5);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            for (Future<Boolean> taskFuture : taskFutures)
+            {
+                assertThat(taskFuture.succeeded()).isTrue();
+                assertThat(taskFuture.result()).isTrue();
+            }
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testLotsOfTasks(VertxTestContext context)
+    {
+        List<Callable<Future<Integer>>> tasks = getDummySuccessTasks(100_000, 
1);
+        AsyncConcurrentTaskExecutor<Integer> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 50);
+        List<Future<Integer>> taskFutures = concurrentTaskExecutor.start();
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {
+            assertThat(ar).isNotNull();
+            for (Future<Integer> taskFuture : taskFutures)
+            {
+                assertThat(taskFuture.succeeded()).isTrue();
+                assertThat(taskFuture.result()).isEqualTo(1);
+            }
+            context.completeNow();
+        }));
+    }
+
+    @Test
+    public void testTaskCancel(VertxTestContext context)
+    {
+        Promise<Boolean> promise = Promise.promise();
+        List<Callable<Future<Boolean>>> tasks = getDummyPendingTasks(10, 
promise);
+        AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start();
+
+        concurrentTaskExecutor.cancelTasks();
+
+        // Complete promise so that first task gets finished
+        promise.fail(new Exception("ERROR!"));
+
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+
+            Future<Boolean> firstTask = taskFutures.get(0);
+            assertThat(firstTask.failed()).isTrue();
+            assertThat(firstTask.cause()).isInstanceOf(Exception.class);
+
+            for (int i = 1; i < taskFutures.size(); i++)
+            {
+                Future<Boolean> task = taskFutures.get(i);
+                assertThat(task.failed()).isTrue();
+                
assertThat(task.cause().getMessage()).isEqualTo(TASK_CANCEL_MESSAGE);
+                
assertThat(task.cause()).isInstanceOf(CancellationException.class);
+            }
+
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    public void testCancelLotOfTasks(VertxTestContext context)
+    {
+        Promise<Integer> promise = Promise.promise();
+        List<Callable<Future<Integer>>> tasks = getDummyPendingTasks(10_000, 
promise);
+        AsyncConcurrentTaskExecutor<Integer> taskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 1);
+        List<Future<Integer>> taskFutures = taskExecutor.start();
+
+        taskExecutor.cancelTasks();
+
+        // Resolve the promise so that first task will finish
+        promise.complete(0);
+
+        Future.join(taskFutures).onComplete(context.failing(result -> 
context.verify(() -> {
+            assertThat(result).isNotNull();
+
+            Future<Integer> firstTask = taskFutures.get(0);
+            assertThat(firstTask.isComplete()).isTrue();
+            assertThat(firstTask.succeeded()).isTrue();
+            assertThat(firstTask.result()).isEqualTo(0);
+
+            for (int i = 1; i < taskFutures.size(); i++)
+            {
+                Future<Integer> task = taskFutures.get(i);
+                assertThat(task.failed()).isTrue();
+                
assertThat(task.cause().getMessage()).isEqualTo(TASK_CANCEL_MESSAGE);
+                
assertThat(task.cause()).isInstanceOf(CancellationException.class);
+            }
+
+            context.completeNow();
+        })));
+    }
+
+    @Test
+    public void testCancelWhenTasksInProgress()
+    {
+        Promise<Integer> task1Promise = Promise.promise();
+        Callable<Future<Integer>> task1 = () -> task1Promise.future()
+                                                            .compose(res -> 
Future.succeededFuture(0));
+        Callable<Future<Integer>> task2 = getDummySuccessTask(1);
+        Promise<Integer> task3Promise = Promise.promise();
+        Callable<Future<Integer>> task3 = () -> task3Promise.future()
+                                                            .compose(res -> 
Future.succeededFuture(2));
+        Callable<Future<Integer>> task4 = getDummySuccessTask(3);
+        Callable<Future<Integer>> task5 = getDummySuccessTask(4);
+
+        List<Callable<Future<Integer>>> tasks = Arrays.asList(task1, task2, 
task3, task4, task5);
+        AsyncConcurrentTaskExecutor<Integer> concurrentTaskExecutor = new 
AsyncConcurrentTaskExecutor<>(vertx, tasks, 2);
+        List<Future<Integer>> taskFutures = concurrentTaskExecutor.start();
+
+        // Wait for some time so that independent tasks get time finish.
+        CompositeFuture compositeFuture = waitForTasks(taskFutures, 1_000);
+        assertThat(compositeFuture.isComplete()).isFalse();
+        concurrentTaskExecutor.cancelTasks();
+        waitForTasks(compositeFuture, 1_000);
+
+        assertThat(taskFutures.get(0).succeeded()).isFalse();
+        assertThat(taskFutures.get(0).isComplete()).isFalse();
+
+        assertThat(taskFutures.get(1).succeeded()).isTrue();
+        assertThat(taskFutures.get(1).isComplete()).isTrue();
+
+        assertThat(taskFutures.get(2).succeeded()).isFalse();
+        assertThat(taskFutures.get(2).isComplete()).isFalse();
+
+        assertThat(taskFutures.get(3).isComplete()).isTrue();
+        assertThat(taskFutures.get(3).failed()).isTrue();
+        
assertThat(taskFutures.get(3).cause()).isInstanceOf(CancellationException.class);
+
+        assertThat(taskFutures.get(4).failed()).isTrue();
+        assertThat(taskFutures.get(4).isComplete()).isTrue();
+        
assertThat(taskFutures.get(4).cause()).isInstanceOf(CancellationException.class);
+
+        assertThat(compositeFuture.isComplete()).isFalse();
+
+        task1Promise.complete();
+        task3Promise.complete();
+
+        waitForTasks(compositeFuture, 2_000);
+        assertThat(Future.join(taskFutures).isComplete()).isTrue();
+    }
+
+    @Test
+    public void testMaxConcurrency(VertxTestContext context)
+    {
+        AtomicInteger counter = new AtomicInteger(0);
+        int maxConcurrency = 4;
+        List<Callable<Future<Integer>>> tasks = new ArrayList<>();
+        for (int i = 0; i < 1000; i++)
+        {
+            Callable<Future<Integer>> task = 
getDummySuccessCounterTask(counter);
+            tasks.add(task);
+        }
+        TestAsyncTaskExecutor<Integer> concurrentTaskExecutor =
+        new TestAsyncTaskExecutor<>(vertx, tasks, maxConcurrency, counter);
+        List<Future<Integer>> taskFutures = concurrentTaskExecutor.start();
+
+        Future.join(taskFutures).onComplete(ar -> context.verify(() -> {

Review Comment:
   ```suggestion
                   Future.join(taskFutures).onComplete(ar -> context.verify(() 
-> {
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java:
##########


Review Comment:
   NIT : usage of `final` in this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to