frankgh commented on code in PR #252: URL: https://github.com/apache/cassandra-sidecar/pull/252#discussion_r2292176436
########## client-common/src/test/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequestTest.java: ########## Review Comment: Indentation in this file is a bit off. I think there's an [effort to standardize](https://github.com/apache/cassandra-sidecar/pull/235) on a style and make it automatic, but I think for now we should try to stay close to the conventions we have in the project ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationDirType.java: ########## @@ -28,7 +28,7 @@ public enum LiveMigrationDirType { CDC_RAW_DIR("cdc_raw"), COMMIT_LOG_DIR("commitlog"), - DATA_FIlE_DIR("data"), + DATA_FILE_DIR("data"), Review Comment: wow, it took me 10 seconds to realize the diff ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java: ########## @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; + +import org.jetbrains.annotations.NotNull; + +/** + * Represents the state and progress of a live migration data copy task using a state machine pattern. + * + * <h2>State Machine</h2> + * The operation can follow these state transitions: + * <pre> + * STARTING -> CLEANING -> PREPARING -> DOWNLOADING -> DOWNLOAD_COMPLETE + * | | | | + * | | | | + * | | | | + * | | | +-> CANCELLED / FAILED + * | | +-> SUCCESS + * | | + * | +-> CANCELLED / FAILED + * +-> CANCELLED / FAILED + * + * Note: CANCELLED and FAILED states can be reached from any non-terminal state. + * DOWNLOAD_COMPLETE is a terminal state (no further transitions). + * SUCCESS can only be reached from PREPARING state. + * Special case: CANCELLED -> FAILED transition is tolerated (returns CANCELLED). + * This handles scenarios where failure occurs after cancellation. + * </pre> + * + * <h2>Thread Safety</h2> + * This class is designed for concurrent access: + * <ul> + * <li>The {@code State} and size fields are immutable once set</li> + * <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, {@code bytesDownloaded}) + * are thread-safe using {@code AtomicInteger} and {@code AtomicLong}</li> + * <li>State transitions create new immutable instances rather than modifying existing ones</li> + * <li>Multiple threads can safely read progress and update counters simultaneously</li> + * </ul> + * + * <h2>Usage</h2> + * State transitions are managed through factory methods that return new instances. + * Progress tracking is handled via atomic fields that can be safely updated from multiple threads. + */ +public class OperationStatus +{ + // State of the operation + private final State state; + + // Total size of data available at source (immutable once set) + private final long totalSize; + + // Total number of files available at source (immutable once set) + private final int totalFiles; + + // Size of data to be copied (immutable once set) + private final long bytesToDownload; + + // Number of files to download from source (immutable once set) + private final int filesToDownload; + + // Number of files downloaded from source (thread-safe atomic counter) + private final AtomicInteger filesDownloaded; + + // Number of download failures (thread-safe atomic counter) + private final AtomicInteger downloadFailures; + + // Size of data downloaded from source in bytes (thread-safe atomic counter) + private final AtomicLong bytesDownloaded; + + private OperationStatus(@NotNull State state, + long totalSize, + int totalFiles, + long bytesToDownload, + int filesToDownload, + @NotNull AtomicInteger filesDownloaded, + @NotNull AtomicInteger downloadFailures, + @NotNull AtomicLong bytesDownloaded) + { + this.state = state; + this.totalSize = totalSize; + this.totalFiles = totalFiles; + this.bytesToDownload = bytesToDownload; + this.filesToDownload = filesToDownload; + this.filesDownloaded = filesDownloaded; + this.downloadFailures = downloadFailures; + this.bytesDownloaded = bytesDownloaded; + } + + public static OperationStatus getStartingState() + { + return new OperationStatus(State.STARTING, + -1, + -1, + -1, + -1, + new AtomicInteger(0), + new AtomicInteger(0), + new AtomicLong(0)); + } + + /** + * Transitions to the CLEANING state with updated file metadata. + * + * @param totalSize total size of files at the source + * @param totalFiles total number of files at the source + * @return new OperationStatus instance in CLEANING state + * @throws IllegalStateTransitionException if current state cannot transition to CLEANING + */ + @VisibleForTesting + public OperationStatus getCleaningState(long totalSize, int totalFiles) + { + return new OperationStatus(this.state.toCleaning(), + totalSize, + totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the PREPARING state. + * + * @return new OperationStatus instance in PREPARING state + * @throws IllegalStateTransitionException if current state cannot transition to PREPARING + */ + @VisibleForTesting + public OperationStatus getPreparingState() + { + return new OperationStatus(this.state.toPreparing(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the DOWNLOADING state with updated download metadata. + * Resets progress counters for the new download phase. + * + * @param bytesToDownload total size of data to be downloaded in bytes + * @param filesToDownload number of files to be downloaded + * @return new OperationStatus instance in DOWNLOADING state + * @throws IllegalStateTransitionException if current state cannot transition to DOWNLOADING + */ + OperationStatus getDownloadingState(final long bytesToDownload, final int filesToDownload) + { + return new OperationStatus(this.state.toDownloading(), + this.totalSize, + this.totalFiles, + bytesToDownload, + filesToDownload, + new AtomicInteger(), + new AtomicInteger(), + new AtomicLong()); + } + + /** + * Transitions to the DOWNLOAD_COMPLETE state. + * + * @return new OperationStatus instance in DOWNLOAD_COMPLETE state + * @throws IllegalStateTransitionException if current state cannot transition to DOWNLOAD_COMPLETE + */ + OperationStatus getDownloadCompleteState() + { + return new OperationStatus(this.state.toDownloadComplete(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the SUCCESS state, indicating successful completion. + * + * @return new OperationStatus instance in SUCCESS state + * @throws IllegalStateTransitionException if current state cannot transition to SUCCESS + */ + @VisibleForTesting + public OperationStatus getSuccessState() + { + return new OperationStatus(this.state.toSuccess(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the FAILED state, indicating operation failure. + * + * @return new OperationStatus instance in FAILED state + * @throws IllegalStateTransitionException if current state cannot transition to FAILED + */ + OperationStatus tryFailureState() + { + return new OperationStatus(this.state.toFailed(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Cancels the task if not completed. + * + * @return Returns same state if completed, otherwise returns cancelled state. + */ + public OperationStatus cancel() + { + return new OperationStatus(this.state.toCancelled(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + public State getState() + { + return state; + } + + public State state() + { + return state; + } + + public long totalSize() + { + return totalSize; + } + + public long bytesToDownload() + { + return bytesToDownload; + } + + public int filesToDownload() + { + return filesToDownload; + } + + /** + * Returns the thread-safe atomic counter for files downloaded. + * This counter can be safely read and updated from multiple threads. + * + * @return atomic counter for files downloaded + */ + public AtomicInteger filesDownloaded() + { + return filesDownloaded; + } + + /** + * Returns the thread-safe atomic counter for bytes downloaded. + * This counter can be safely read and updated from multiple threads. + * + * @return atomic counter for bytes downloaded + */ + public AtomicLong bytesDownloaded() + { + return bytesDownloaded; + } + + public int totalFiles() + { + return totalFiles; + } + + /** + * Returns the thread-safe atomic counter for download failures. + * This counter can be safely read and updated from multiple threads. + * + * @return atomic counter for download failures + */ + public AtomicInteger downloadFailures() + { + return downloadFailures; + } + + @Override + public String toString() + { + return "OperationStatus{" + + "bytesDownloaded=" + bytesDownloaded + + ", filesDownloaded=" + filesDownloaded + + ", filesToDownload=" + filesToDownload + + ", totalSize=" + totalSize + + ", downloadSize=" + bytesToDownload + + ", state=" + state + + '}'; + } + + /** + * Represents the various states of a live migration data copy operation. + * + * <h3>State Descriptions:</h3> + * <ul> + * <li><b>STARTING</b> - Initial state when the operation begins</li> + * <li><b>CLEANING</b> - Removing unnecessary files from the destination</li> + * <li><b>PREPARING</b> - Analyzing files to determine what needs to be downloaded</li> + * <li><b>DOWNLOADING</b> - Actively downloading files from the source</li> + * <li><b>DOWNLOAD_COMPLETE</b> - All files have been downloaded (terminal state)</li> + * <li><b>SUCCESS</b> - Operation completed successfully (terminal state)</li> + * <li><b>FAILED</b> - Operation failed due to an error (terminal state)</li> + * <li><b>CANCELLED</b> - Operation was cancelled by user request (terminal state)</li> + * </ul> + * <p> + * Invalid transitions throw {@link IllegalStateTransitionException}. + */ + public enum State Review Comment: I think it would be valuable to have unit tests for this specific enum that focus on testing the transitions ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java: ########## @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; + +import org.jetbrains.annotations.NotNull; + +/** + * Represents the state and progress of a live migration data copy task using a state machine pattern. + * + * <h2>State Machine</h2> + * The operation can follow these state transitions: + * <pre> + * STARTING -> CLEANING -> PREPARING -> DOWNLOADING -> DOWNLOAD_COMPLETE + * | | | | + * | | | | + * | | | | + * | | | +-> CANCELLED / FAILED + * | | +-> SUCCESS + * | | + * | +-> CANCELLED / FAILED + * +-> CANCELLED / FAILED + * + * Note: CANCELLED and FAILED states can be reached from any non-terminal state. + * DOWNLOAD_COMPLETE is a terminal state (no further transitions). + * SUCCESS can only be reached from PREPARING state. + * Special case: CANCELLED -> FAILED transition is tolerated (returns CANCELLED). + * This handles scenarios where failure occurs after cancellation. + * </pre> + * + * <h2>Thread Safety</h2> + * This class is designed for concurrent access: + * <ul> + * <li>The {@code State} and size fields are immutable once set</li> + * <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, {@code bytesDownloaded}) + * are thread-safe using {@code AtomicInteger} and {@code AtomicLong}</li> + * <li>State transitions create new immutable instances rather than modifying existing ones</li> + * <li>Multiple threads can safely read progress and update counters simultaneously</li> + * </ul> + * + * <h2>Usage</h2> + * State transitions are managed through factory methods that return new instances. + * Progress tracking is handled via atomic fields that can be safely updated from multiple threads. + */ +public class OperationStatus +{ + // State of the operation + private final State state; + + // Total size of data available at source (immutable once set) + private final long totalSize; + + // Total number of files available at source (immutable once set) + private final int totalFiles; + + // Size of data to be copied (immutable once set) + private final long bytesToDownload; + + // Number of files to download from source (immutable once set) + private final int filesToDownload; + + // Number of files downloaded from source (thread-safe atomic counter) + private final AtomicInteger filesDownloaded; + + // Number of download failures (thread-safe atomic counter) + private final AtomicInteger downloadFailures; + + // Size of data downloaded from source in bytes (thread-safe atomic counter) + private final AtomicLong bytesDownloaded; + + private OperationStatus(@NotNull State state, + long totalSize, + int totalFiles, + long bytesToDownload, + int filesToDownload, + @NotNull AtomicInteger filesDownloaded, + @NotNull AtomicInteger downloadFailures, + @NotNull AtomicLong bytesDownloaded) + { + this.state = state; + this.totalSize = totalSize; + this.totalFiles = totalFiles; + this.bytesToDownload = bytesToDownload; + this.filesToDownload = filesToDownload; + this.filesDownloaded = filesDownloaded; + this.downloadFailures = downloadFailures; + this.bytesDownloaded = bytesDownloaded; + } + + public static OperationStatus getStartingState() + { + return new OperationStatus(State.STARTING, + -1, + -1, + -1, + -1, + new AtomicInteger(0), + new AtomicInteger(0), + new AtomicLong(0)); + } + + /** + * Transitions to the CLEANING state with updated file metadata. + * + * @param totalSize total size of files at the source + * @param totalFiles total number of files at the source + * @return new OperationStatus instance in CLEANING state + * @throws IllegalStateTransitionException if current state cannot transition to CLEANING + */ + @VisibleForTesting + public OperationStatus getCleaningState(long totalSize, int totalFiles) Review Comment: should this be renamed to indicate that a transition is occurring? for example `toCleaningState` ? ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Vertx; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration; +import org.apache.cassandra.sidecar.config.SidecarConfiguration; +import org.apache.cassandra.sidecar.utils.SidecarClientProvider; + +/** + * Factory implementation which produces {@link LiveMigrationTask} instances. + */ +@Singleton +public class LiveMigrationTaskFactoryImpl implements LiveMigrationTaskFactory +{ + + private final Vertx vertx; + private final SidecarClientProvider sidecarClientProvider; + private final LiveMigrationConfiguration liveMigrationConfiguration; + private final ExecutorPools executorPools; + + @Inject + public LiveMigrationTaskFactoryImpl(Vertx vertx, + ExecutorPools executorPools, + SidecarClientProvider sidecarClientProvider, + SidecarConfiguration sidecarConfiguration) + { + this.vertx = vertx; + this.executorPools = executorPools; + this.sidecarClientProvider = sidecarClientProvider; + this.liveMigrationConfiguration = sidecarConfiguration.liveMigrationConfiguration(); + } + + /** + * {@inheritDoc} + */ + @Override + public LiveMigrationTask create(final LiveMigrationDataCopyRequest request, + final String source, + final int port, + final InstanceMetadata instanceMetadata) Review Comment: NIT ```suggestion public LiveMigrationTask create(LiveMigrationDataCopyRequest request, String source, int port, InstanceMetadata instanceMetadata) ``` ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java: ########## @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; + +import org.jetbrains.annotations.NotNull; + +/** + * Represents the state and progress of a live migration data copy task using a state machine pattern. + * + * <h2>State Machine</h2> + * The operation can follow these state transitions: + * <pre> + * STARTING -> CLEANING -> PREPARING -> DOWNLOADING -> DOWNLOAD_COMPLETE + * | | | | + * | | | | + * | | | | + * | | | +-> CANCELLED / FAILED + * | | +-> SUCCESS + * | | + * | +-> CANCELLED / FAILED + * +-> CANCELLED / FAILED + * + * Note: CANCELLED and FAILED states can be reached from any non-terminal state. + * DOWNLOAD_COMPLETE is a terminal state (no further transitions). + * SUCCESS can only be reached from PREPARING state. + * Special case: CANCELLED -> FAILED transition is tolerated (returns CANCELLED). + * This handles scenarios where failure occurs after cancellation. + * </pre> + * + * <h2>Thread Safety</h2> + * This class is designed for concurrent access: + * <ul> + * <li>The {@code State} and size fields are immutable once set</li> + * <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, {@code bytesDownloaded}) + * are thread-safe using {@code AtomicInteger} and {@code AtomicLong}</li> + * <li>State transitions create new immutable instances rather than modifying existing ones</li> + * <li>Multiple threads can safely read progress and update counters simultaneously</li> + * </ul> + * + * <h2>Usage</h2> + * State transitions are managed through factory methods that return new instances. + * Progress tracking is handled via atomic fields that can be safely updated from multiple threads. + */ +public class OperationStatus +{ + // State of the operation + private final State state; + + // Total size of data available at source (immutable once set) + private final long totalSize; + + // Total number of files available at source (immutable once set) + private final int totalFiles; + + // Size of data to be copied (immutable once set) + private final long bytesToDownload; + + // Number of files to download from source (immutable once set) + private final int filesToDownload; + + // Number of files downloaded from source (thread-safe atomic counter) + private final AtomicInteger filesDownloaded; + + // Number of download failures (thread-safe atomic counter) + private final AtomicInteger downloadFailures; + + // Size of data downloaded from source in bytes (thread-safe atomic counter) + private final AtomicLong bytesDownloaded; + + private OperationStatus(@NotNull State state, + long totalSize, + int totalFiles, + long bytesToDownload, + int filesToDownload, + @NotNull AtomicInteger filesDownloaded, + @NotNull AtomicInteger downloadFailures, + @NotNull AtomicLong bytesDownloaded) + { + this.state = state; + this.totalSize = totalSize; + this.totalFiles = totalFiles; + this.bytesToDownload = bytesToDownload; + this.filesToDownload = filesToDownload; + this.filesDownloaded = filesDownloaded; + this.downloadFailures = downloadFailures; + this.bytesDownloaded = bytesDownloaded; + } + + public static OperationStatus getStartingState() + { + return new OperationStatus(State.STARTING, + -1, + -1, + -1, + -1, + new AtomicInteger(0), + new AtomicInteger(0), + new AtomicLong(0)); + } + + /** + * Transitions to the CLEANING state with updated file metadata. + * + * @param totalSize total size of files at the source + * @param totalFiles total number of files at the source + * @return new OperationStatus instance in CLEANING state + * @throws IllegalStateTransitionException if current state cannot transition to CLEANING + */ + @VisibleForTesting + public OperationStatus getCleaningState(long totalSize, int totalFiles) + { + return new OperationStatus(this.state.toCleaning(), + totalSize, + totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the PREPARING state. + * + * @return new OperationStatus instance in PREPARING state + * @throws IllegalStateTransitionException if current state cannot transition to PREPARING + */ + @VisibleForTesting + public OperationStatus getPreparingState() Review Comment: ```suggestion public OperationStatus toPreparingState() ``` ########## server/src/main/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutor.java: ########## @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.concurrent; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.jetbrains.annotations.NotNull; + +/** + * A task executor that manages concurrent execution of asynchronous tasks with configurable concurrency limit. + * + * <h2>Overview</h2> + * <p>This executor ensures that only a specified number of tasks (defined by {@code maxConcurrency}) + * execute simultaneously. When a task completes, the next pending task is automatically + * triggered, maintaining optimal resource utilization while respecting concurrency constraints.</p> + * + * <h2>Architecture</h2> + * <p>The executor uses a promise-based triggering mechanism:</p> + * <ul> + * <li>Each task is associated with a {@link Promise} that acts as a trigger signal</li> + * <li>Tasks only start execution when their corresponding Promise completes successfully</li> + * <li>Initially, {@code maxConcurrency} number of promises are completed to start the execution flow</li> + * <li>As tasks complete, the next pending task is automatically triggered</li> + * </ul> + * + * <h2>Concurrency Control</h2> + * <p>The executor maintains a global task index using {@link AtomicInteger} to ensure thread-safe + * task scheduling. Tasks are processed in the order they were submitted, with automatic + * flow control to maintain the desired concurrency level.</p> + * + * <h2>Error Handling</h2> + * <p>When a task completes (successfully or with a non-{@link CancellationException} error), + * the executor automatically triggers the next pending task. Tasks that are cancelled do not + * trigger subsequent tasks to prevent further executions.</p> + * + * <h2>Usage Example</h2> + * <pre>{@code + * List<Callable<Future<String>>> tasks = Arrays.asList( + * () -> downloadFile("file1.txt"), + * () -> downloadFile("file2.txt"), + * () -> downloadFile("file3.txt") + * ); + * + * AsyncConcurrentTaskExecutor<String> executor = + * new AsyncConcurrentTaskExecutor<>(vertx, tasks, 2); // max 2 tasks can execute concurrently + * + * List<Future<String>> results = executor.start(); + * }</pre> + * + * @param <T> the type of result produced by the tasks + */ +public class AsyncConcurrentTaskExecutor<T> +{ + protected static final String TASK_CANCEL_MESSAGE = "task cancelled"; + private final AtomicInteger index = new AtomicInteger(0); + private final int maxConcurrency; + private final Vertx vertx; + private final List<Promise<T>> taskPromises; + private final List<Callable<Future<T>>> tasks; + + /** + * Creates a new AsyncConcurrentTaskExecutor. + * + * @param vertx the Vert.x instance for context execution + * @param tasks the list of tasks to execute (must not be empty) + * @param maxConcurrency the maximum number of tasks to execute concurrently (must be > 0) + * @throws IllegalArgumentException if maxConcurrency <= 0 or tasks is empty + */ + public AsyncConcurrentTaskExecutor(@NotNull Vertx vertx, + @NotNull List<Callable<Future<T>>> tasks, + int maxConcurrency) + { + Objects.requireNonNull(vertx, "vertx must not be null."); + Objects.requireNonNull(tasks, "tasks must not be null."); + if (maxConcurrency < 1) + { + throw (new IllegalArgumentException("maxConcurrency must be > 0")); + } + + this.tasks = Collections.unmodifiableList(tasks); + this.maxConcurrency = maxConcurrency; + this.vertx = vertx; + taskPromises = tasks.stream() + .map(item -> Promise.<T>promise()) + .collect(Collectors.toUnmodifiableList()); + } + + /** + * Returns the list of task promises for testing purposes. + * + * @return unmodifiable list of task promises + */ + @VisibleForTesting + List<Promise<T>> getTaskPromises() + { + return taskPromises; + } + + /** + * Starts the execution of tasks with the configured concurrency limit. + * + * <p>This method immediately starts up to {@code maxConcurrency} tasks and returns + * futures for all tasks. As running tasks complete, pending tasks are automatically + * triggered to maintain the concurrency level.</p> + * + * <h3>Execution Flow Diagram:</h3> + * <pre> + * index + * + + * | + * v + * +----+----+-+--+----+------------+----+ + * | p0 | p1 | p2 | p3 | ... |pn-1| + * +-+--+-+--+-+--+-+--+------------+-+--+ + * | | | | | + * | | + + + + * | | + * | | + + + + * | | | | | + * + + + + + + * t0 t1 t2 t3 tn-1 + * (r) (r) + * (r) = running + * </pre> + * + * @return list of futures representing all tasks (same order as input) + */ + public List<Future<T>> start() + { + List<Future<T>> taskFutures = new ArrayList<>(tasks.size()); + for (int i = 0; i < tasks.size(); i++) + { + Promise<T> p = taskPromises.get(i); + taskFutures.add(waitAndDo(p.future(), tasks.get(i))); + } + + // Start maxConcurrency no.of tasks + for (int i = 0; i < maxConcurrency; i++) + { + triggerNextTask(); + } + return taskFutures; + } + + /** + * Associates a task with its trigger promise and handles task execution and completion. + * + * <p>This method creates a future chain where the task only executes when the trigger + * future completes. Upon task completion, it automatically triggers the next pending task.</p> + * + * @param future the trigger future that must complete before task execution + * @param task the task to execute when triggered + * @return future representing the task's execution result + */ + private Future<T> waitAndDo(Future<T> future, Callable<Future<T>> task) + { + return future.compose(ar -> { + try + { + return task.call(); + } + catch (Exception e) + { + return Future.failedFuture(e); + } + }) + .onComplete(ar -> { + if (ar.succeeded() || !(ar.cause() instanceof CancellationException)) Review Comment: does it make sense to log here if the task did not succeed? ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java: ########## @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; + +import org.jetbrains.annotations.NotNull; + +/** + * Represents the state and progress of a live migration data copy task using a state machine pattern. + * + * <h2>State Machine</h2> + * The operation can follow these state transitions: + * <pre> + * STARTING -> CLEANING -> PREPARING -> DOWNLOADING -> DOWNLOAD_COMPLETE + * | | | | + * | | | | + * | | | | + * | | | +-> CANCELLED / FAILED + * | | +-> SUCCESS + * | | + * | +-> CANCELLED / FAILED + * +-> CANCELLED / FAILED + * + * Note: CANCELLED and FAILED states can be reached from any non-terminal state. + * DOWNLOAD_COMPLETE is a terminal state (no further transitions). + * SUCCESS can only be reached from PREPARING state. + * Special case: CANCELLED -> FAILED transition is tolerated (returns CANCELLED). + * This handles scenarios where failure occurs after cancellation. + * </pre> + * + * <h2>Thread Safety</h2> + * This class is designed for concurrent access: + * <ul> + * <li>The {@code State} and size fields are immutable once set</li> + * <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, {@code bytesDownloaded}) + * are thread-safe using {@code AtomicInteger} and {@code AtomicLong}</li> + * <li>State transitions create new immutable instances rather than modifying existing ones</li> + * <li>Multiple threads can safely read progress and update counters simultaneously</li> + * </ul> + * + * <h2>Usage</h2> + * State transitions are managed through factory methods that return new instances. + * Progress tracking is handled via atomic fields that can be safely updated from multiple threads. + */ +public class OperationStatus +{ + // State of the operation + private final State state; + + // Total size of data available at source (immutable once set) + private final long totalSize; + + // Total number of files available at source (immutable once set) + private final int totalFiles; + + // Size of data to be copied (immutable once set) + private final long bytesToDownload; + + // Number of files to download from source (immutable once set) + private final int filesToDownload; + + // Number of files downloaded from source (thread-safe atomic counter) + private final AtomicInteger filesDownloaded; + + // Number of download failures (thread-safe atomic counter) + private final AtomicInteger downloadFailures; + + // Size of data downloaded from source in bytes (thread-safe atomic counter) + private final AtomicLong bytesDownloaded; + + private OperationStatus(@NotNull State state, + long totalSize, + int totalFiles, + long bytesToDownload, + int filesToDownload, + @NotNull AtomicInteger filesDownloaded, + @NotNull AtomicInteger downloadFailures, + @NotNull AtomicLong bytesDownloaded) + { + this.state = state; + this.totalSize = totalSize; + this.totalFiles = totalFiles; + this.bytesToDownload = bytesToDownload; + this.filesToDownload = filesToDownload; + this.filesDownloaded = filesDownloaded; + this.downloadFailures = downloadFailures; + this.bytesDownloaded = bytesDownloaded; + } + + public static OperationStatus getStartingState() + { + return new OperationStatus(State.STARTING, + -1, + -1, + -1, + -1, + new AtomicInteger(0), + new AtomicInteger(0), + new AtomicLong(0)); + } + + /** + * Transitions to the CLEANING state with updated file metadata. + * + * @param totalSize total size of files at the source + * @param totalFiles total number of files at the source + * @return new OperationStatus instance in CLEANING state + * @throws IllegalStateTransitionException if current state cannot transition to CLEANING + */ + @VisibleForTesting + public OperationStatus getCleaningState(long totalSize, int totalFiles) + { + return new OperationStatus(this.state.toCleaning(), + totalSize, + totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the PREPARING state. + * + * @return new OperationStatus instance in PREPARING state + * @throws IllegalStateTransitionException if current state cannot transition to PREPARING + */ + @VisibleForTesting + public OperationStatus getPreparingState() + { + return new OperationStatus(this.state.toPreparing(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the DOWNLOADING state with updated download metadata. + * Resets progress counters for the new download phase. + * + * @param bytesToDownload total size of data to be downloaded in bytes + * @param filesToDownload number of files to be downloaded + * @return new OperationStatus instance in DOWNLOADING state + * @throws IllegalStateTransitionException if current state cannot transition to DOWNLOADING + */ + OperationStatus getDownloadingState(final long bytesToDownload, final int filesToDownload) + { + return new OperationStatus(this.state.toDownloading(), + this.totalSize, + this.totalFiles, + bytesToDownload, + filesToDownload, + new AtomicInteger(), + new AtomicInteger(), + new AtomicLong()); + } + + /** + * Transitions to the DOWNLOAD_COMPLETE state. + * + * @return new OperationStatus instance in DOWNLOAD_COMPLETE state + * @throws IllegalStateTransitionException if current state cannot transition to DOWNLOAD_COMPLETE + */ + OperationStatus getDownloadCompleteState() + { + return new OperationStatus(this.state.toDownloadComplete(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the SUCCESS state, indicating successful completion. + * + * @return new OperationStatus instance in SUCCESS state + * @throws IllegalStateTransitionException if current state cannot transition to SUCCESS + */ + @VisibleForTesting + public OperationStatus getSuccessState() Review Comment: ```suggestion public OperationStatus toSuccessState() ``` ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java: ########## @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; + +import org.jetbrains.annotations.NotNull; + +/** + * Represents the state and progress of a live migration data copy task using a state machine pattern. + * + * <h2>State Machine</h2> + * The operation can follow these state transitions: + * <pre> + * STARTING -> CLEANING -> PREPARING -> DOWNLOADING -> DOWNLOAD_COMPLETE + * | | | | + * | | | | + * | | | | + * | | | +-> CANCELLED / FAILED + * | | +-> SUCCESS + * | | + * | +-> CANCELLED / FAILED + * +-> CANCELLED / FAILED + * + * Note: CANCELLED and FAILED states can be reached from any non-terminal state. + * DOWNLOAD_COMPLETE is a terminal state (no further transitions). + * SUCCESS can only be reached from PREPARING state. + * Special case: CANCELLED -> FAILED transition is tolerated (returns CANCELLED). + * This handles scenarios where failure occurs after cancellation. + * </pre> + * + * <h2>Thread Safety</h2> + * This class is designed for concurrent access: + * <ul> + * <li>The {@code State} and size fields are immutable once set</li> + * <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, {@code bytesDownloaded}) + * are thread-safe using {@code AtomicInteger} and {@code AtomicLong}</li> + * <li>State transitions create new immutable instances rather than modifying existing ones</li> + * <li>Multiple threads can safely read progress and update counters simultaneously</li> + * </ul> + * + * <h2>Usage</h2> + * State transitions are managed through factory methods that return new instances. + * Progress tracking is handled via atomic fields that can be safely updated from multiple threads. + */ +public class OperationStatus +{ + // State of the operation + private final State state; + + // Total size of data available at source (immutable once set) + private final long totalSize; + + // Total number of files available at source (immutable once set) + private final int totalFiles; + + // Size of data to be copied (immutable once set) + private final long bytesToDownload; + + // Number of files to download from source (immutable once set) + private final int filesToDownload; + + // Number of files downloaded from source (thread-safe atomic counter) + private final AtomicInteger filesDownloaded; + + // Number of download failures (thread-safe atomic counter) + private final AtomicInteger downloadFailures; + + // Size of data downloaded from source in bytes (thread-safe atomic counter) + private final AtomicLong bytesDownloaded; + + private OperationStatus(@NotNull State state, + long totalSize, + int totalFiles, + long bytesToDownload, + int filesToDownload, + @NotNull AtomicInteger filesDownloaded, + @NotNull AtomicInteger downloadFailures, + @NotNull AtomicLong bytesDownloaded) + { + this.state = state; + this.totalSize = totalSize; + this.totalFiles = totalFiles; + this.bytesToDownload = bytesToDownload; + this.filesToDownload = filesToDownload; + this.filesDownloaded = filesDownloaded; + this.downloadFailures = downloadFailures; + this.bytesDownloaded = bytesDownloaded; + } + + public static OperationStatus getStartingState() + { + return new OperationStatus(State.STARTING, + -1, + -1, + -1, + -1, + new AtomicInteger(0), + new AtomicInteger(0), + new AtomicLong(0)); + } + + /** + * Transitions to the CLEANING state with updated file metadata. + * + * @param totalSize total size of files at the source + * @param totalFiles total number of files at the source + * @return new OperationStatus instance in CLEANING state + * @throws IllegalStateTransitionException if current state cannot transition to CLEANING + */ + @VisibleForTesting + public OperationStatus getCleaningState(long totalSize, int totalFiles) + { + return new OperationStatus(this.state.toCleaning(), + totalSize, + totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the PREPARING state. + * + * @return new OperationStatus instance in PREPARING state + * @throws IllegalStateTransitionException if current state cannot transition to PREPARING + */ + @VisibleForTesting + public OperationStatus getPreparingState() + { + return new OperationStatus(this.state.toPreparing(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the DOWNLOADING state with updated download metadata. + * Resets progress counters for the new download phase. + * + * @param bytesToDownload total size of data to be downloaded in bytes + * @param filesToDownload number of files to be downloaded + * @return new OperationStatus instance in DOWNLOADING state + * @throws IllegalStateTransitionException if current state cannot transition to DOWNLOADING + */ + OperationStatus getDownloadingState(final long bytesToDownload, final int filesToDownload) + { + return new OperationStatus(this.state.toDownloading(), + this.totalSize, + this.totalFiles, + bytesToDownload, + filesToDownload, + new AtomicInteger(), + new AtomicInteger(), + new AtomicLong()); + } + + /** + * Transitions to the DOWNLOAD_COMPLETE state. + * + * @return new OperationStatus instance in DOWNLOAD_COMPLETE state + * @throws IllegalStateTransitionException if current state cannot transition to DOWNLOAD_COMPLETE + */ + OperationStatus getDownloadCompleteState() Review Comment: ```suggestion OperationStatus toDownloadCompleteState() ``` ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java: ########## @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; + +import org.jetbrains.annotations.NotNull; + +/** + * Represents the state and progress of a live migration data copy task using a state machine pattern. + * + * <h2>State Machine</h2> + * The operation can follow these state transitions: + * <pre> + * STARTING -> CLEANING -> PREPARING -> DOWNLOADING -> DOWNLOAD_COMPLETE + * | | | | + * | | | | + * | | | | + * | | | +-> CANCELLED / FAILED + * | | +-> SUCCESS + * | | + * | +-> CANCELLED / FAILED + * +-> CANCELLED / FAILED + * + * Note: CANCELLED and FAILED states can be reached from any non-terminal state. + * DOWNLOAD_COMPLETE is a terminal state (no further transitions). + * SUCCESS can only be reached from PREPARING state. + * Special case: CANCELLED -> FAILED transition is tolerated (returns CANCELLED). + * This handles scenarios where failure occurs after cancellation. + * </pre> + * + * <h2>Thread Safety</h2> + * This class is designed for concurrent access: + * <ul> + * <li>The {@code State} and size fields are immutable once set</li> + * <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, {@code bytesDownloaded}) + * are thread-safe using {@code AtomicInteger} and {@code AtomicLong}</li> + * <li>State transitions create new immutable instances rather than modifying existing ones</li> + * <li>Multiple threads can safely read progress and update counters simultaneously</li> + * </ul> + * + * <h2>Usage</h2> + * State transitions are managed through factory methods that return new instances. + * Progress tracking is handled via atomic fields that can be safely updated from multiple threads. + */ +public class OperationStatus +{ + // State of the operation + private final State state; + + // Total size of data available at source (immutable once set) + private final long totalSize; + + // Total number of files available at source (immutable once set) + private final int totalFiles; + + // Size of data to be copied (immutable once set) + private final long bytesToDownload; + + // Number of files to download from source (immutable once set) + private final int filesToDownload; + + // Number of files downloaded from source (thread-safe atomic counter) + private final AtomicInteger filesDownloaded; + + // Number of download failures (thread-safe atomic counter) + private final AtomicInteger downloadFailures; + + // Size of data downloaded from source in bytes (thread-safe atomic counter) + private final AtomicLong bytesDownloaded; + + private OperationStatus(@NotNull State state, + long totalSize, + int totalFiles, + long bytesToDownload, + int filesToDownload, + @NotNull AtomicInteger filesDownloaded, + @NotNull AtomicInteger downloadFailures, + @NotNull AtomicLong bytesDownloaded) + { + this.state = state; + this.totalSize = totalSize; + this.totalFiles = totalFiles; + this.bytesToDownload = bytesToDownload; + this.filesToDownload = filesToDownload; + this.filesDownloaded = filesDownloaded; + this.downloadFailures = downloadFailures; + this.bytesDownloaded = bytesDownloaded; + } + + public static OperationStatus getStartingState() + { + return new OperationStatus(State.STARTING, + -1, + -1, + -1, + -1, + new AtomicInteger(0), + new AtomicInteger(0), + new AtomicLong(0)); + } + + /** + * Transitions to the CLEANING state with updated file metadata. + * + * @param totalSize total size of files at the source + * @param totalFiles total number of files at the source + * @return new OperationStatus instance in CLEANING state + * @throws IllegalStateTransitionException if current state cannot transition to CLEANING + */ + @VisibleForTesting + public OperationStatus getCleaningState(long totalSize, int totalFiles) + { + return new OperationStatus(this.state.toCleaning(), + totalSize, + totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the PREPARING state. + * + * @return new OperationStatus instance in PREPARING state + * @throws IllegalStateTransitionException if current state cannot transition to PREPARING + */ + @VisibleForTesting + public OperationStatus getPreparingState() + { + return new OperationStatus(this.state.toPreparing(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the DOWNLOADING state with updated download metadata. + * Resets progress counters for the new download phase. + * + * @param bytesToDownload total size of data to be downloaded in bytes + * @param filesToDownload number of files to be downloaded + * @return new OperationStatus instance in DOWNLOADING state + * @throws IllegalStateTransitionException if current state cannot transition to DOWNLOADING + */ + OperationStatus getDownloadingState(final long bytesToDownload, final int filesToDownload) Review Comment: ```suggestion OperationStatus toDownloadingState(long bytesToDownload, int filesToDownload) ``` ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java: ########## @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; + +import org.jetbrains.annotations.NotNull; + +/** + * Represents the state and progress of a live migration data copy task using a state machine pattern. + * + * <h2>State Machine</h2> + * The operation can follow these state transitions: + * <pre> + * STARTING -> CLEANING -> PREPARING -> DOWNLOADING -> DOWNLOAD_COMPLETE + * | | | | + * | | | | + * | | | | + * | | | +-> CANCELLED / FAILED + * | | +-> SUCCESS + * | | + * | +-> CANCELLED / FAILED + * +-> CANCELLED / FAILED + * + * Note: CANCELLED and FAILED states can be reached from any non-terminal state. + * DOWNLOAD_COMPLETE is a terminal state (no further transitions). + * SUCCESS can only be reached from PREPARING state. + * Special case: CANCELLED -> FAILED transition is tolerated (returns CANCELLED). + * This handles scenarios where failure occurs after cancellation. + * </pre> + * + * <h2>Thread Safety</h2> + * This class is designed for concurrent access: + * <ul> + * <li>The {@code State} and size fields are immutable once set</li> + * <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, {@code bytesDownloaded}) + * are thread-safe using {@code AtomicInteger} and {@code AtomicLong}</li> + * <li>State transitions create new immutable instances rather than modifying existing ones</li> + * <li>Multiple threads can safely read progress and update counters simultaneously</li> + * </ul> + * + * <h2>Usage</h2> + * State transitions are managed through factory methods that return new instances. + * Progress tracking is handled via atomic fields that can be safely updated from multiple threads. + */ +public class OperationStatus +{ + // State of the operation + private final State state; + + // Total size of data available at source (immutable once set) + private final long totalSize; + + // Total number of files available at source (immutable once set) + private final int totalFiles; + + // Size of data to be copied (immutable once set) + private final long bytesToDownload; + + // Number of files to download from source (immutable once set) + private final int filesToDownload; + + // Number of files downloaded from source (thread-safe atomic counter) + private final AtomicInteger filesDownloaded; + + // Number of download failures (thread-safe atomic counter) + private final AtomicInteger downloadFailures; + + // Size of data downloaded from source in bytes (thread-safe atomic counter) + private final AtomicLong bytesDownloaded; + + private OperationStatus(@NotNull State state, + long totalSize, + int totalFiles, + long bytesToDownload, + int filesToDownload, + @NotNull AtomicInteger filesDownloaded, + @NotNull AtomicInteger downloadFailures, + @NotNull AtomicLong bytesDownloaded) + { + this.state = state; + this.totalSize = totalSize; + this.totalFiles = totalFiles; + this.bytesToDownload = bytesToDownload; + this.filesToDownload = filesToDownload; + this.filesDownloaded = filesDownloaded; + this.downloadFailures = downloadFailures; + this.bytesDownloaded = bytesDownloaded; + } + + public static OperationStatus getStartingState() + { + return new OperationStatus(State.STARTING, + -1, + -1, + -1, + -1, + new AtomicInteger(0), + new AtomicInteger(0), + new AtomicLong(0)); + } + + /** + * Transitions to the CLEANING state with updated file metadata. + * + * @param totalSize total size of files at the source + * @param totalFiles total number of files at the source + * @return new OperationStatus instance in CLEANING state + * @throws IllegalStateTransitionException if current state cannot transition to CLEANING + */ + @VisibleForTesting + public OperationStatus getCleaningState(long totalSize, int totalFiles) + { + return new OperationStatus(this.state.toCleaning(), + totalSize, + totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the PREPARING state. + * + * @return new OperationStatus instance in PREPARING state + * @throws IllegalStateTransitionException if current state cannot transition to PREPARING + */ + @VisibleForTesting + public OperationStatus getPreparingState() + { + return new OperationStatus(this.state.toPreparing(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the DOWNLOADING state with updated download metadata. + * Resets progress counters for the new download phase. + * + * @param bytesToDownload total size of data to be downloaded in bytes + * @param filesToDownload number of files to be downloaded + * @return new OperationStatus instance in DOWNLOADING state + * @throws IllegalStateTransitionException if current state cannot transition to DOWNLOADING + */ + OperationStatus getDownloadingState(final long bytesToDownload, final int filesToDownload) + { + return new OperationStatus(this.state.toDownloading(), + this.totalSize, + this.totalFiles, + bytesToDownload, + filesToDownload, + new AtomicInteger(), + new AtomicInteger(), + new AtomicLong()); + } + + /** + * Transitions to the DOWNLOAD_COMPLETE state. + * + * @return new OperationStatus instance in DOWNLOAD_COMPLETE state + * @throws IllegalStateTransitionException if current state cannot transition to DOWNLOAD_COMPLETE + */ + OperationStatus getDownloadCompleteState() + { + return new OperationStatus(this.state.toDownloadComplete(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the SUCCESS state, indicating successful completion. + * + * @return new OperationStatus instance in SUCCESS state + * @throws IllegalStateTransitionException if current state cannot transition to SUCCESS + */ + @VisibleForTesting + public OperationStatus getSuccessState() + { + return new OperationStatus(this.state.toSuccess(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the FAILED state, indicating operation failure. + * + * @return new OperationStatus instance in FAILED state + * @throws IllegalStateTransitionException if current state cannot transition to FAILED + */ + OperationStatus tryFailureState() + { + return new OperationStatus(this.state.toFailed(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Cancels the task if not completed. + * + * @return Returns same state if completed, otherwise returns cancelled state. + */ + public OperationStatus cancel() + { + return new OperationStatus(this.state.toCancelled(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + public State getState() + { + return state; + } + + public State state() + { + return state; + } + + public long totalSize() + { + return totalSize; + } + + public long bytesToDownload() + { + return bytesToDownload; + } + + public int filesToDownload() + { + return filesToDownload; + } + + /** + * Returns the thread-safe atomic counter for files downloaded. + * This counter can be safely read and updated from multiple threads. + * + * @return atomic counter for files downloaded + */ + public AtomicInteger filesDownloaded() + { + return filesDownloaded; + } + + /** + * Returns the thread-safe atomic counter for bytes downloaded. + * This counter can be safely read and updated from multiple threads. + * + * @return atomic counter for bytes downloaded + */ + public AtomicLong bytesDownloaded() Review Comment: In general -1 on exposing these values directly and instead providing business oriented methods that allow you to internally modify these values ########## server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java: ########## @@ -51,11 +58,113 @@ public class LiveMigrationModule extends AbstractModule protected void configure() { bind(LiveMigrationMap.class).to(LiveMigrationMapSidecarConfigImpl.class); + bind(LiveMigrationTaskFactory.class).to(LiveMigrationTaskFactoryImpl.class); } + @GET + @Path(ApiEndpointsV1.LIVE_MIGRATION_DATA_COPY_TASKS_ROUTE) + @Operation(summary = "Create data copy task", + description = "Creates a new data copy task for live migration") + @APIResponse(description = "Data copy task created successfully", + responseCode = "200", + content = @Content(mediaType = "application/json", + schema = @Schema(type = SchemaType.OBJECT))) + @APIResponse(responseCode = "403", + description = "Live migration not enabled or node not configured as destination", + content = @Content(mediaType = "application/json", + schema = @Schema(type = SchemaType.OBJECT))) + @ProvidesIntoMap + @KeyClassMapKey(VertxRouteMapKeys.LiveMigrationCreateDataCopyTaskRouteKey.class) + public VertxRoute createDataCopyTaskRoute(RouteBuilder.Factory factory, + LiveMigrationApiEnableDisableHandler liveMigrationApiEnableDisableHandler, + LiveMigrationCreateDataCopyTaskHandler liveMigrationCreateDataCopyTaskHandler) + { + return factory.builderForRoute() + .setBodyHandler(true) + .handler(liveMigrationApiEnableDisableHandler::isDestination) + .handler(liveMigrationCreateDataCopyTaskHandler) + .build(); + } + + @PATCH + @Operation(summary = "Cancel data copy task", + description = "Cancels an existing data copy task for live migration") + @APIResponse(description = "Data copy task cancelled successfully", + responseCode = "200", + content = @Content(mediaType = "application/json", + schema = @Schema(type = SchemaType.OBJECT))) + @APIResponse(responseCode = "403", + description = "Live migration not enabled or node not configured as destination", + content = @Content(mediaType = "application/json", + schema = @Schema(type = SchemaType.OBJECT))) + @APIResponse(responseCode = "404", + description = "Data copy task not found", + content = @Content(mediaType = "application/json", + schema = @Schema(type = SchemaType.OBJECT))) + @ProvidesIntoMap + @KeyClassMapKey(VertxRouteMapKeys.LiveMigrationCancelDataCopyTaskRouteKey.class) + public VertxRoute cancelDataCopyTaskRoute(RouteBuilder.Factory factory, + LiveMigrationApiEnableDisableHandler liveMigrationApiEnableDisableHandler, + LiveMigrationCancelDataCopyTaskHandler liveMigrationCancelDataCopyTaskHandler) + { + return factory.builderForRoute() + .handler(liveMigrationApiEnableDisableHandler::isDestination) + .handler(liveMigrationCancelDataCopyTaskHandler) + .build(); + } + + @GET + @Operation(summary = "Get data copy task", + description = "Retrieves the status and details of a specific data copy task by task ID") + @APIResponse(description = "Data copy task retrieved successfully", + responseCode = "200", + content = @Content(mediaType = "application/json", + schema = @Schema(type = SchemaType.OBJECT))) Review Comment: ```suggestion schema = @Schema(implementation = LiveMigrationTaskResponse.class))) ``` ########## server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java: ########## @@ -68,10 +177,10 @@ protected void configure() schema = @Schema(type = SchemaType.OBJECT))) @ProvidesIntoMap @KeyClassMapKey(VertxRouteMapKeys.LiveMigrationFileStreamHandlerRouteKey.class) - VertxRoute downloadFileRoute(RouteBuilder.Factory factory, - LiveMigrationApiEnableDisableHandler liveMigrationApiEnableDisableHandler, - LiveMigrationFileStreamHandler liveMigrationFileStreamHandler, - FileStreamHandler fileStreamHandler) + public VertxRoute downloadFileRoute(RouteBuilder.Factory factory, Review Comment: I think these shouldn't be public. Any reason we are changing the visibility here? ########## server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java: ########## @@ -51,11 +58,113 @@ public class LiveMigrationModule extends AbstractModule protected void configure() { bind(LiveMigrationMap.class).to(LiveMigrationMapSidecarConfigImpl.class); + bind(LiveMigrationTaskFactory.class).to(LiveMigrationTaskFactoryImpl.class); } + @GET + @Path(ApiEndpointsV1.LIVE_MIGRATION_DATA_COPY_TASKS_ROUTE) + @Operation(summary = "Create data copy task", + description = "Creates a new data copy task for live migration") + @APIResponse(description = "Data copy task created successfully", + responseCode = "200", + content = @Content(mediaType = "application/json", + schema = @Schema(type = SchemaType.OBJECT))) + @APIResponse(responseCode = "403", + description = "Live migration not enabled or node not configured as destination", + content = @Content(mediaType = "application/json", + schema = @Schema(type = SchemaType.OBJECT))) + @ProvidesIntoMap + @KeyClassMapKey(VertxRouteMapKeys.LiveMigrationCreateDataCopyTaskRouteKey.class) + public VertxRoute createDataCopyTaskRoute(RouteBuilder.Factory factory, + LiveMigrationApiEnableDisableHandler liveMigrationApiEnableDisableHandler, + LiveMigrationCreateDataCopyTaskHandler liveMigrationCreateDataCopyTaskHandler) + { + return factory.builderForRoute() + .setBodyHandler(true) + .handler(liveMigrationApiEnableDisableHandler::isDestination) + .handler(liveMigrationCreateDataCopyTaskHandler) + .build(); + } + + @PATCH + @Operation(summary = "Cancel data copy task", + description = "Cancels an existing data copy task for live migration") + @APIResponse(description = "Data copy task cancelled successfully", + responseCode = "200", + content = @Content(mediaType = "application/json", + schema = @Schema(type = SchemaType.OBJECT))) Review Comment: ```suggestion schema = @Schema(implementation = LiveMigrationTaskResponse.class))) ``` ########## server/src/main/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutor.java: ########## @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.concurrent; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import org.jetbrains.annotations.NotNull; + +/** + * A task executor that manages concurrent execution of asynchronous tasks with configurable concurrency limit. + * + * <h2>Overview</h2> + * <p>This executor ensures that only a specified number of tasks (defined by {@code maxConcurrency}) + * execute simultaneously. When a task completes, the next pending task is automatically + * triggered, maintaining optimal resource utilization while respecting concurrency constraints.</p> + * + * <h2>Architecture</h2> + * <p>The executor uses a promise-based triggering mechanism:</p> + * <ul> + * <li>Each task is associated with a {@link Promise} that acts as a trigger signal</li> + * <li>Tasks only start execution when their corresponding Promise completes successfully</li> + * <li>Initially, {@code maxConcurrency} number of promises are completed to start the execution flow</li> + * <li>As tasks complete, the next pending task is automatically triggered</li> + * </ul> + * + * <h2>Concurrency Control</h2> + * <p>The executor maintains a global task index using {@link AtomicInteger} to ensure thread-safe + * task scheduling. Tasks are processed in the order they were submitted, with automatic + * flow control to maintain the desired concurrency level.</p> + * + * <h2>Error Handling</h2> + * <p>When a task completes (successfully or with a non-{@link CancellationException} error), + * the executor automatically triggers the next pending task. Tasks that are cancelled do not + * trigger subsequent tasks to prevent further executions.</p> + * + * <h2>Usage Example</h2> + * <pre>{@code + * List<Callable<Future<String>>> tasks = Arrays.asList( + * () -> downloadFile("file1.txt"), + * () -> downloadFile("file2.txt"), + * () -> downloadFile("file3.txt") + * ); + * + * AsyncConcurrentTaskExecutor<String> executor = + * new AsyncConcurrentTaskExecutor<>(vertx, tasks, 2); // max 2 tasks can execute concurrently + * + * List<Future<String>> results = executor.start(); + * }</pre> + * + * @param <T> the type of result produced by the tasks + */ +public class AsyncConcurrentTaskExecutor<T> +{ + protected static final String TASK_CANCEL_MESSAGE = "task cancelled"; + private final AtomicInteger index = new AtomicInteger(0); + private final int maxConcurrency; + private final Vertx vertx; + private final List<Promise<T>> taskPromises; + private final List<Callable<Future<T>>> tasks; + + /** + * Creates a new AsyncConcurrentTaskExecutor. + * + * @param vertx the Vert.x instance for context execution + * @param tasks the list of tasks to execute (must not be empty) + * @param maxConcurrency the maximum number of tasks to execute concurrently (must be > 0) + * @throws IllegalArgumentException if maxConcurrency <= 0 or tasks is empty + */ + public AsyncConcurrentTaskExecutor(@NotNull Vertx vertx, + @NotNull List<Callable<Future<T>>> tasks, + int maxConcurrency) + { + Objects.requireNonNull(vertx, "vertx must not be null."); + Objects.requireNonNull(tasks, "tasks must not be null."); + if (maxConcurrency < 1) + { + throw (new IllegalArgumentException("maxConcurrency must be > 0")); + } + + this.tasks = Collections.unmodifiableList(tasks); Review Comment: NIT: prefer newer JDK APIs ```suggestion this.tasks = List.copyOf(tasks); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/OperationStatus.java: ########## @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.livemigration; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.annotations.VisibleForTesting; + +import org.jetbrains.annotations.NotNull; + +/** + * Represents the state and progress of a live migration data copy task using a state machine pattern. + * + * <h2>State Machine</h2> + * The operation can follow these state transitions: + * <pre> + * STARTING -> CLEANING -> PREPARING -> DOWNLOADING -> DOWNLOAD_COMPLETE + * | | | | + * | | | | + * | | | | + * | | | +-> CANCELLED / FAILED + * | | +-> SUCCESS + * | | + * | +-> CANCELLED / FAILED + * +-> CANCELLED / FAILED + * + * Note: CANCELLED and FAILED states can be reached from any non-terminal state. + * DOWNLOAD_COMPLETE is a terminal state (no further transitions). + * SUCCESS can only be reached from PREPARING state. + * Special case: CANCELLED -> FAILED transition is tolerated (returns CANCELLED). + * This handles scenarios where failure occurs after cancellation. + * </pre> + * + * <h2>Thread Safety</h2> + * This class is designed for concurrent access: + * <ul> + * <li>The {@code State} and size fields are immutable once set</li> + * <li>Progress counters ({@code filesDownloaded}, {@code downloadFailures}, {@code bytesDownloaded}) + * are thread-safe using {@code AtomicInteger} and {@code AtomicLong}</li> + * <li>State transitions create new immutable instances rather than modifying existing ones</li> + * <li>Multiple threads can safely read progress and update counters simultaneously</li> + * </ul> + * + * <h2>Usage</h2> + * State transitions are managed through factory methods that return new instances. + * Progress tracking is handled via atomic fields that can be safely updated from multiple threads. + */ +public class OperationStatus +{ + // State of the operation + private final State state; + + // Total size of data available at source (immutable once set) + private final long totalSize; + + // Total number of files available at source (immutable once set) + private final int totalFiles; + + // Size of data to be copied (immutable once set) + private final long bytesToDownload; + + // Number of files to download from source (immutable once set) + private final int filesToDownload; + + // Number of files downloaded from source (thread-safe atomic counter) + private final AtomicInteger filesDownloaded; + + // Number of download failures (thread-safe atomic counter) + private final AtomicInteger downloadFailures; + + // Size of data downloaded from source in bytes (thread-safe atomic counter) + private final AtomicLong bytesDownloaded; + + private OperationStatus(@NotNull State state, + long totalSize, + int totalFiles, + long bytesToDownload, + int filesToDownload, + @NotNull AtomicInteger filesDownloaded, + @NotNull AtomicInteger downloadFailures, + @NotNull AtomicLong bytesDownloaded) + { + this.state = state; + this.totalSize = totalSize; + this.totalFiles = totalFiles; + this.bytesToDownload = bytesToDownload; + this.filesToDownload = filesToDownload; + this.filesDownloaded = filesDownloaded; + this.downloadFailures = downloadFailures; + this.bytesDownloaded = bytesDownloaded; + } + + public static OperationStatus getStartingState() + { + return new OperationStatus(State.STARTING, + -1, + -1, + -1, + -1, + new AtomicInteger(0), + new AtomicInteger(0), + new AtomicLong(0)); + } + + /** + * Transitions to the CLEANING state with updated file metadata. + * + * @param totalSize total size of files at the source + * @param totalFiles total number of files at the source + * @return new OperationStatus instance in CLEANING state + * @throws IllegalStateTransitionException if current state cannot transition to CLEANING + */ + @VisibleForTesting + public OperationStatus getCleaningState(long totalSize, int totalFiles) + { + return new OperationStatus(this.state.toCleaning(), + totalSize, + totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the PREPARING state. + * + * @return new OperationStatus instance in PREPARING state + * @throws IllegalStateTransitionException if current state cannot transition to PREPARING + */ + @VisibleForTesting + public OperationStatus getPreparingState() + { + return new OperationStatus(this.state.toPreparing(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the DOWNLOADING state with updated download metadata. + * Resets progress counters for the new download phase. + * + * @param bytesToDownload total size of data to be downloaded in bytes + * @param filesToDownload number of files to be downloaded + * @return new OperationStatus instance in DOWNLOADING state + * @throws IllegalStateTransitionException if current state cannot transition to DOWNLOADING + */ + OperationStatus getDownloadingState(final long bytesToDownload, final int filesToDownload) + { + return new OperationStatus(this.state.toDownloading(), + this.totalSize, + this.totalFiles, + bytesToDownload, + filesToDownload, + new AtomicInteger(), + new AtomicInteger(), + new AtomicLong()); + } + + /** + * Transitions to the DOWNLOAD_COMPLETE state. + * + * @return new OperationStatus instance in DOWNLOAD_COMPLETE state + * @throws IllegalStateTransitionException if current state cannot transition to DOWNLOAD_COMPLETE + */ + OperationStatus getDownloadCompleteState() + { + return new OperationStatus(this.state.toDownloadComplete(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the SUCCESS state, indicating successful completion. + * + * @return new OperationStatus instance in SUCCESS state + * @throws IllegalStateTransitionException if current state cannot transition to SUCCESS + */ + @VisibleForTesting + public OperationStatus getSuccessState() + { + return new OperationStatus(this.state.toSuccess(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Transitions to the FAILED state, indicating operation failure. + * + * @return new OperationStatus instance in FAILED state + * @throws IllegalStateTransitionException if current state cannot transition to FAILED + */ + OperationStatus tryFailureState() + { + return new OperationStatus(this.state.toFailed(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + /** + * Cancels the task if not completed. + * + * @return Returns same state if completed, otherwise returns cancelled state. + */ + public OperationStatus cancel() + { + return new OperationStatus(this.state.toCancelled(), + this.totalSize, + this.totalFiles, + this.bytesToDownload, + this.filesToDownload, + this.filesDownloaded, + this.downloadFailures, + this.bytesDownloaded); + } + + public State getState() + { + return state; + } + + public State state() + { + return state; + } + + public long totalSize() + { + return totalSize; + } + + public long bytesToDownload() + { + return bytesToDownload; + } + + public int filesToDownload() + { + return filesToDownload; + } + + /** + * Returns the thread-safe atomic counter for files downloaded. + * This counter can be safely read and updated from multiple threads. + * + * @return atomic counter for files downloaded + */ + public AtomicInteger filesDownloaded() Review Comment: I don't think we should expose the internal atomic integer values here, but instead expose methods that will internally update the counts. i.e. `markFileDownloaded()` ########## server/src/test/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutorTest.java: ########## @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.concurrent; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; + +import static org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor.TASK_CANCEL_MESSAGE; +import static org.assertj.core.api.Assertions.assertThat; + +@ExtendWith(VertxExtension.class) +class AsyncConcurrentTaskExecutorTest +{ + private final Vertx vertx = Vertx.vertx(); + + @Test + public void testTaskFailure(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = new ArrayList<>(); + String failureMessage = "Task failed"; + tasks.add(getDummyFailedTask(failureMessage)); + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(context.failing(result -> context.verify(() -> { + assertThat(result).isNotNull(); + assertThat(result.getMessage()).isEqualTo(failureMessage); + assertThat(taskFutures.get(0).failed()).isTrue(); + context.completeNow(); + }))); + } + + @Test + void testZeroTasks(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = new ArrayList<>(); + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { + assertThat(ar).isNotNull(); + assertThat(ar.result().list().size()).isEqualTo(0); + context.completeNow(); + })); + } + + @Test + public void testMultipleTaskFailure(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { + String failureMessage = "Task " + i + " failed."; + tasks.add(getDummyFailedTask(failureMessage)); + } + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 5); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(context.failing(result -> context.verify(() -> { + assertThat(result).isNotNull(); + taskFutures.forEach(ar -> assertThat(ar.failed()).isTrue()); + context.completeNow(); + }))); + } + + @Test + public void testSingleTaskSuccess(VertxTestContext context) + { + List<Callable<Future<String>>> tasks = Collections.singletonList(getDummySuccessTask("Task executed successfully")); + AsyncConcurrentTaskExecutor<String> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<String>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { + assertThat(ar).isNotNull(); + assertThat(ar.result().list().get(0)).isEqualTo("Task executed successfully"); + context.completeNow(); + })); + } + + @Test + public void testMultipleTasksSucceeds(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = getDummySuccessTasks(10, Boolean.TRUE); + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 5); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { + assertThat(ar).isNotNull(); Review Comment: ```suggestion Future.join(taskFutures).onComplete(context.succeeding(result -> context.verify(() -> { assertThat(result).isNotNull(); ``` ########## server/src/test/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutorTest.java: ########## @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.concurrent; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; + +import static org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor.TASK_CANCEL_MESSAGE; +import static org.assertj.core.api.Assertions.assertThat; + +@ExtendWith(VertxExtension.class) +class AsyncConcurrentTaskExecutorTest +{ + private final Vertx vertx = Vertx.vertx(); + + @Test + public void testTaskFailure(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = new ArrayList<>(); + String failureMessage = "Task failed"; + tasks.add(getDummyFailedTask(failureMessage)); + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(context.failing(result -> context.verify(() -> { + assertThat(result).isNotNull(); + assertThat(result.getMessage()).isEqualTo(failureMessage); + assertThat(taskFutures.get(0).failed()).isTrue(); + context.completeNow(); + }))); + } + + @Test + void testZeroTasks(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = new ArrayList<>(); + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { + assertThat(ar).isNotNull(); + assertThat(ar.result().list().size()).isEqualTo(0); + context.completeNow(); + })); + } + + @Test + public void testMultipleTaskFailure(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { + String failureMessage = "Task " + i + " failed."; + tasks.add(getDummyFailedTask(failureMessage)); + } + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 5); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(context.failing(result -> context.verify(() -> { + assertThat(result).isNotNull(); + taskFutures.forEach(ar -> assertThat(ar.failed()).isTrue()); + context.completeNow(); + }))); + } + + @Test + public void testSingleTaskSuccess(VertxTestContext context) + { + List<Callable<Future<String>>> tasks = Collections.singletonList(getDummySuccessTask("Task executed successfully")); + AsyncConcurrentTaskExecutor<String> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<String>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { + assertThat(ar).isNotNull(); + assertThat(ar.result().list().get(0)).isEqualTo("Task executed successfully"); Review Comment: I think we need to wrap the future in context.succeeding, otherwise errors will be swallowed ```suggestion Future.join(taskFutures).onComplete(context.succeeding(result -> context.verify(() -> { assertThat(result).isNotNull(); assertThat(result.list().get(0)).isEqualTo("Task executed successfully"); ``` ########## server/src/test/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutorTest.java: ########## @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.concurrent; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; + +import static org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor.TASK_CANCEL_MESSAGE; +import static org.assertj.core.api.Assertions.assertThat; + +@ExtendWith(VertxExtension.class) +class AsyncConcurrentTaskExecutorTest +{ + private final Vertx vertx = Vertx.vertx(); + + @Test + public void testTaskFailure(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = new ArrayList<>(); + String failureMessage = "Task failed"; + tasks.add(getDummyFailedTask(failureMessage)); + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(context.failing(result -> context.verify(() -> { + assertThat(result).isNotNull(); + assertThat(result.getMessage()).isEqualTo(failureMessage); + assertThat(taskFutures.get(0).failed()).isTrue(); + context.completeNow(); + }))); + } + + @Test + void testZeroTasks(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = new ArrayList<>(); + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { + assertThat(ar).isNotNull(); + assertThat(ar.result().list().size()).isEqualTo(0); + context.completeNow(); + })); + } + + @Test + public void testMultipleTaskFailure(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { + String failureMessage = "Task " + i + " failed."; + tasks.add(getDummyFailedTask(failureMessage)); + } + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 5); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(context.failing(result -> context.verify(() -> { + assertThat(result).isNotNull(); + taskFutures.forEach(ar -> assertThat(ar.failed()).isTrue()); + context.completeNow(); + }))); + } + + @Test + public void testSingleTaskSuccess(VertxTestContext context) + { + List<Callable<Future<String>>> tasks = Collections.singletonList(getDummySuccessTask("Task executed successfully")); + AsyncConcurrentTaskExecutor<String> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<String>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { + assertThat(ar).isNotNull(); + assertThat(ar.result().list().get(0)).isEqualTo("Task executed successfully"); + context.completeNow(); + })); + } + + @Test + public void testMultipleTasksSucceeds(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = getDummySuccessTasks(10, Boolean.TRUE); + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 5); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { + assertThat(ar).isNotNull(); + for (Future<Boolean> taskFuture : taskFutures) + { + assertThat(taskFuture.succeeded()).isTrue(); + assertThat(taskFuture.result()).isTrue(); + } + context.completeNow(); + })); + } + + @Test + public void testLotsOfTasks(VertxTestContext context) + { + List<Callable<Future<Integer>>> tasks = getDummySuccessTasks(100_000, 1); + AsyncConcurrentTaskExecutor<Integer> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 50); + List<Future<Integer>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { + assertThat(ar).isNotNull(); Review Comment: ```suggestion Future.join(taskFutures).onComplete(context.succeeding(result -> context.verify(() -> { assertThat(result).isNotNull(); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.handlers.livemigration; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.Json; +import io.vertx.core.json.JsonObject; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationDataCopyInProgressException; +import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException; +import org.apache.cassandra.sidecar.handlers.AbstractHandler; +import org.apache.cassandra.sidecar.handlers.AccessProtected; +import org.apache.cassandra.sidecar.livemigration.DataCopyTaskManager; +import org.apache.cassandra.sidecar.livemigration.LiveMigrationTask; +import org.apache.cassandra.sidecar.utils.CassandraInputValidator; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; +import org.jetbrains.annotations.NotNull; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Handler for creating data copy tasks for live migration between Cassandra instances. + * <p> + * This handler processes requests to initiate data copying from a source Cassandra instance to a destination instance. + * Data copy tasks must be submitted to the destination Sidecar instance. When a task is accepted, + * this handler responds with HTTP 202 ACCEPTED status and returns a JSON response containing: + * - taskId: Unique identifier for tracking the created task + * - statusUrl: URL that can be used to query the status of the data copy operation + */ +public class LiveMigrationCreateDataCopyTaskHandler extends AbstractHandler<LiveMigrationDataCopyRequest> implements AccessProtected +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LiveMigrationCreateDataCopyTaskHandler.class); + private final DataCopyTaskManager dataCopyTaskManager; + + @Inject + public LiveMigrationCreateDataCopyTaskHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools, + CassandraInputValidator validator, + DataCopyTaskManager dataCopyTaskManager) + { + super(metadataFetcher, executorPools, validator); + this.dataCopyTaskManager = dataCopyTaskManager; + } + + @Override + protected LiveMigrationDataCopyRequest extractParamsOrThrow(RoutingContext context) + { + try + { + return Json.decodeValue(context.body().buffer(), LiveMigrationDataCopyRequest.class); + } + catch (DecodeException decodeException) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, + "Error while decoding values, please check your request body.", + decodeException); + } + } + + @Override + protected void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + @NotNull String host, + SocketAddress remoteAddress, + LiveMigrationDataCopyRequest liveMigrationTaskRequest) + { + dataCopyTaskManager Review Comment: I see create task has some sort of concurrency limiter control. There is something similar implemented in `org.apache.cassandra.sidecar.handlers.sstableuploads.SSTableUploadHandler` leveraging the `org.apache.cassandra.sidecar.concurrent.ConcurrencyLimiter` ########## server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java: ########## @@ -51,11 +58,113 @@ public class LiveMigrationModule extends AbstractModule protected void configure() { bind(LiveMigrationMap.class).to(LiveMigrationMapSidecarConfigImpl.class); + bind(LiveMigrationTaskFactory.class).to(LiveMigrationTaskFactoryImpl.class); } + @GET + @Path(ApiEndpointsV1.LIVE_MIGRATION_DATA_COPY_TASKS_ROUTE) + @Operation(summary = "Create data copy task", + description = "Creates a new data copy task for live migration") + @APIResponse(description = "Data copy task created successfully", + responseCode = "200", Review Comment: I think it is returning an ACCEPTED status code, which I believe is 202, no? ########## server/src/test/java/org/apache/cassandra/sidecar/concurrent/AsyncConcurrentTaskExecutorTest.java: ########## @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.concurrent; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; + +import static org.apache.cassandra.sidecar.concurrent.AsyncConcurrentTaskExecutor.TASK_CANCEL_MESSAGE; +import static org.assertj.core.api.Assertions.assertThat; + +@ExtendWith(VertxExtension.class) +class AsyncConcurrentTaskExecutorTest +{ + private final Vertx vertx = Vertx.vertx(); + + @Test + public void testTaskFailure(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = new ArrayList<>(); + String failureMessage = "Task failed"; + tasks.add(getDummyFailedTask(failureMessage)); + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(context.failing(result -> context.verify(() -> { + assertThat(result).isNotNull(); + assertThat(result.getMessage()).isEqualTo(failureMessage); + assertThat(taskFutures.get(0).failed()).isTrue(); + context.completeNow(); + }))); + } + + @Test + void testZeroTasks(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = new ArrayList<>(); + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { + assertThat(ar).isNotNull(); + assertThat(ar.result().list().size()).isEqualTo(0); + context.completeNow(); + })); + } + + @Test + public void testMultipleTaskFailure(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = new ArrayList<>(); + for (int i = 0; i < 10; i++) + { + String failureMessage = "Task " + i + " failed."; + tasks.add(getDummyFailedTask(failureMessage)); + } + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 5); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(context.failing(result -> context.verify(() -> { + assertThat(result).isNotNull(); + taskFutures.forEach(ar -> assertThat(ar.failed()).isTrue()); + context.completeNow(); + }))); + } + + @Test + public void testSingleTaskSuccess(VertxTestContext context) + { + List<Callable<Future<String>>> tasks = Collections.singletonList(getDummySuccessTask("Task executed successfully")); + AsyncConcurrentTaskExecutor<String> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<String>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { + assertThat(ar).isNotNull(); + assertThat(ar.result().list().get(0)).isEqualTo("Task executed successfully"); + context.completeNow(); + })); + } + + @Test + public void testMultipleTasksSucceeds(VertxTestContext context) + { + List<Callable<Future<Boolean>>> tasks = getDummySuccessTasks(10, Boolean.TRUE); + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 5); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { + assertThat(ar).isNotNull(); + for (Future<Boolean> taskFuture : taskFutures) + { + assertThat(taskFuture.succeeded()).isTrue(); + assertThat(taskFuture.result()).isTrue(); + } + context.completeNow(); + })); + } + + @Test + public void testLotsOfTasks(VertxTestContext context) + { + List<Callable<Future<Integer>>> tasks = getDummySuccessTasks(100_000, 1); + AsyncConcurrentTaskExecutor<Integer> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 50); + List<Future<Integer>> taskFutures = concurrentTaskExecutor.start(); + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { + assertThat(ar).isNotNull(); + for (Future<Integer> taskFuture : taskFutures) + { + assertThat(taskFuture.succeeded()).isTrue(); + assertThat(taskFuture.result()).isEqualTo(1); + } + context.completeNow(); + })); + } + + @Test + public void testTaskCancel(VertxTestContext context) + { + Promise<Boolean> promise = Promise.promise(); + List<Callable<Future<Boolean>>> tasks = getDummyPendingTasks(10, promise); + AsyncConcurrentTaskExecutor<Boolean> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<Boolean>> taskFutures = concurrentTaskExecutor.start(); + + concurrentTaskExecutor.cancelTasks(); + + // Complete promise so that first task gets finished + promise.fail(new Exception("ERROR!")); + + Future.join(taskFutures).onComplete(context.failing(result -> context.verify(() -> { + assertThat(result).isNotNull(); + + Future<Boolean> firstTask = taskFutures.get(0); + assertThat(firstTask.failed()).isTrue(); + assertThat(firstTask.cause()).isInstanceOf(Exception.class); + + for (int i = 1; i < taskFutures.size(); i++) + { + Future<Boolean> task = taskFutures.get(i); + assertThat(task.failed()).isTrue(); + assertThat(task.cause().getMessage()).isEqualTo(TASK_CANCEL_MESSAGE); + assertThat(task.cause()).isInstanceOf(CancellationException.class); + } + + context.completeNow(); + }))); + } + + @Test + public void testCancelLotOfTasks(VertxTestContext context) + { + Promise<Integer> promise = Promise.promise(); + List<Callable<Future<Integer>>> tasks = getDummyPendingTasks(10_000, promise); + AsyncConcurrentTaskExecutor<Integer> taskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 1); + List<Future<Integer>> taskFutures = taskExecutor.start(); + + taskExecutor.cancelTasks(); + + // Resolve the promise so that first task will finish + promise.complete(0); + + Future.join(taskFutures).onComplete(context.failing(result -> context.verify(() -> { + assertThat(result).isNotNull(); + + Future<Integer> firstTask = taskFutures.get(0); + assertThat(firstTask.isComplete()).isTrue(); + assertThat(firstTask.succeeded()).isTrue(); + assertThat(firstTask.result()).isEqualTo(0); + + for (int i = 1; i < taskFutures.size(); i++) + { + Future<Integer> task = taskFutures.get(i); + assertThat(task.failed()).isTrue(); + assertThat(task.cause().getMessage()).isEqualTo(TASK_CANCEL_MESSAGE); + assertThat(task.cause()).isInstanceOf(CancellationException.class); + } + + context.completeNow(); + }))); + } + + @Test + public void testCancelWhenTasksInProgress() + { + Promise<Integer> task1Promise = Promise.promise(); + Callable<Future<Integer>> task1 = () -> task1Promise.future() + .compose(res -> Future.succeededFuture(0)); + Callable<Future<Integer>> task2 = getDummySuccessTask(1); + Promise<Integer> task3Promise = Promise.promise(); + Callable<Future<Integer>> task3 = () -> task3Promise.future() + .compose(res -> Future.succeededFuture(2)); + Callable<Future<Integer>> task4 = getDummySuccessTask(3); + Callable<Future<Integer>> task5 = getDummySuccessTask(4); + + List<Callable<Future<Integer>>> tasks = Arrays.asList(task1, task2, task3, task4, task5); + AsyncConcurrentTaskExecutor<Integer> concurrentTaskExecutor = new AsyncConcurrentTaskExecutor<>(vertx, tasks, 2); + List<Future<Integer>> taskFutures = concurrentTaskExecutor.start(); + + // Wait for some time so that independent tasks get time finish. + CompositeFuture compositeFuture = waitForTasks(taskFutures, 1_000); + assertThat(compositeFuture.isComplete()).isFalse(); + concurrentTaskExecutor.cancelTasks(); + waitForTasks(compositeFuture, 1_000); + + assertThat(taskFutures.get(0).succeeded()).isFalse(); + assertThat(taskFutures.get(0).isComplete()).isFalse(); + + assertThat(taskFutures.get(1).succeeded()).isTrue(); + assertThat(taskFutures.get(1).isComplete()).isTrue(); + + assertThat(taskFutures.get(2).succeeded()).isFalse(); + assertThat(taskFutures.get(2).isComplete()).isFalse(); + + assertThat(taskFutures.get(3).isComplete()).isTrue(); + assertThat(taskFutures.get(3).failed()).isTrue(); + assertThat(taskFutures.get(3).cause()).isInstanceOf(CancellationException.class); + + assertThat(taskFutures.get(4).failed()).isTrue(); + assertThat(taskFutures.get(4).isComplete()).isTrue(); + assertThat(taskFutures.get(4).cause()).isInstanceOf(CancellationException.class); + + assertThat(compositeFuture.isComplete()).isFalse(); + + task1Promise.complete(); + task3Promise.complete(); + + waitForTasks(compositeFuture, 2_000); + assertThat(Future.join(taskFutures).isComplete()).isTrue(); + } + + @Test + public void testMaxConcurrency(VertxTestContext context) + { + AtomicInteger counter = new AtomicInteger(0); + int maxConcurrency = 4; + List<Callable<Future<Integer>>> tasks = new ArrayList<>(); + for (int i = 0; i < 1000; i++) + { + Callable<Future<Integer>> task = getDummySuccessCounterTask(counter); + tasks.add(task); + } + TestAsyncTaskExecutor<Integer> concurrentTaskExecutor = + new TestAsyncTaskExecutor<>(vertx, tasks, maxConcurrency, counter); + List<Future<Integer>> taskFutures = concurrentTaskExecutor.start(); + + Future.join(taskFutures).onComplete(ar -> context.verify(() -> { Review Comment: ```suggestion Future.join(taskFutures).onComplete(ar -> context.verify(() -> { ``` ########## server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java: ########## Review Comment: NIT : usage of `final` in this class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org