yifan-c commented on code in PR #231:
URL: https://github.com/apache/cassandra-sidecar/pull/231#discussion_r2663545539


##########
server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.adapters.base.RepairOptions;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.RepairJobsConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of {@link OperationalJob} to perform repair operation.
+ * <p>
+ * Cassandra repair operations are asynchronous and can run for extended 
periods. When a repair is
+ * detected to be IN_PROGRESS, this job sets its status to RUNNING and 
completes its promise.
+ * <p>
+ * The job's status can be checked at any time using the {@link #status()} 
method, which directly
+ * queries the current repair status from Cassandra, regardless of whether the 
promise has been completed.
+ * <p>
+ * This design ensures timely responses to clients while allowing accurate 
status reporting for
+ * long-running repair operations.
+ */
+public class RepairJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RepairJob.class);
+    private static final String OPERATION = "repair";
+    private static final String PREVIEW_KIND_REPAIRED = "REPAIRED";
+
+    private final RepairRequestParam repairParams;
+    private final RepairJobsConfiguration config;
+    private final TaskExecutorPool internalPool;
+    protected StorageOperations storageOperations;
+    private volatile OperationalJobStatus currentStatus;
+    private volatile int commandId = -1; // Store the command ID for status 
checks
+
+    /**
+     * Enum representing the status of a parent repair session
+     */
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
+    /**
+     * Constructor for creation of RepairJob
+     *
+     * @param taskExecutorPool    TaskExecutorPool instance (for testing)
+     * @param config              Repair job configuration
+     * @param jobId               UUID representing the Job to be created
+     * @param storageOps          Reference to the storage operations interface
+     * @param repairParams        Repair request parameters
+     */
+    public RepairJob(TaskExecutorPool taskExecutorPool,
+                     RepairJobsConfiguration config,
+                     UUID jobId,
+                     StorageOperations storageOps,
+                     RepairRequestParam repairParams)
+    {
+        super(jobId);
+        this.internalPool = taskExecutorPool;
+        this.config = config;
+        this.storageOperations = storageOps;
+        this.repairParams = repairParams;
+        this.currentStatus = OperationalJobStatus.CREATED;
+    }
+
+    /**
+     * {@inheritDoc}
+     * RepairJob allows parallel executions since multiple repairs can run 
concurrently
+     * on different tables or with different parameters.
+     */
+    @Override
+    public boolean hasConflict(@NotNull List<OperationalJob> sameOperationJobs)
+    {
+        // For now, we simply allow all repair jobs to run in parallel
+        // A more sophisticated implementation could check the repair 
parameters
+        // against currently running repairs to detect potential conflicts
+        return false;
+    }
+
+    @Override
+    protected Future<Void> executeInternal()
+    {
+        try
+        {
+            Map<String, String> options = 
generateRepairOptions(repairParams.requestPayload());
+            String keyspace = repairParams.keyspace().name();
+
+            LOGGER.info("Executing repair operation for keyspace {} jobId={}", 
keyspace, this.jobId());
+
+            try
+            {
+                commandId = storageOperations.repair(keyspace, options);
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Failed to initiate repair for keyspace {} 
jobId={}", keyspace, this.jobId(), e);
+                currentStatus = OperationalJobStatus.FAILED;
+                return Future.failedFuture(e);
+            }
+
+            if (commandId <= 0)
+            {
+                // When there are no relevant token ranges for the keyspace or 
RF is 1, the repair is inapplicable
+                LOGGER.info("Repair is not applicable for the provided options 
and keyspace '{}' jobId '{}'", keyspace, this.jobId());
+                currentStatus = OperationalJobStatus.SUCCEEDED;
+                return Future.succeededFuture();
+            }
+
+            // Create promise for job submission response - this completes 
when we have a definitive
+            // status to return to the client (either repair is running, 
completed, or failed)
+            final Promise<Void> jobSubmissionPromise = Promise.promise();
+            int maxAttempts = config.repairStatusMaxAttempts();
+            final AtomicInteger attemptCounter = new AtomicInteger(0);

Review Comment:
   remove the `final`s and the other such `final` occurrences in the method 
local scope. 



##########
server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java:
##########
@@ -167,6 +170,8 @@ public Future<Void> asyncResult(TaskExecutorPool 
executorPool, DurationSpec wait
 
         // complete the max wait time promise either when exceeding the wait 
time, or the result is available
         Promise<Boolean> maxWaitTimePromise = Promise.promise();
+
+        // Pool's setTimer executes the handler on the pool's thread, but the 
timer is still managed by vertx eventloop
         executorPool.setTimer(waitTime.toMillis(), d -> 
maxWaitTimePromise.tryComplete(true)); // complete with true, meaning timeout

Review Comment:
   a bit confused by the comment. Maybe just remove it. 
   
   The timer's lambda is executed in the pool. The timer is scheduled in vertx. 
Not sure if it makes things more clear with this comment. If the readability is 
to be improved, I would rather update the java doc of 
`org.apache.cassandra.sidecar.concurrent.TaskExecutorPool#setTimer(long, 
io.vertx.core.Handler<java.lang.Long>, boolean)` 



##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java:
##########
@@ -135,6 +135,24 @@ default void outOfRangeDataCleanup(@NotNull String 
keyspace, @NotNull String tab
      */
     String clusterName();
 
+    /**
+     * Triggers a repair operation for the given keyspace and options
+     *
+     * @param keyspace keyspace for the repair operation
+     * @param options  repair options
+     * @return an integer value representing the status of the repair operation
+     * which can be used as a reference to check for the status of the repair 
session via  {@link #getParentRepairStatus(int)}.

Review Comment:
   document the meaning of return value 0.



##########
adapters/adapters-base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java:
##########
@@ -257,6 +257,25 @@ public void drain() throws IOException, 
InterruptedException, ExecutionException
                  .drain();
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<String> getParentRepairStatus(int cmd)
+    {
+        return jmxClient.proxy(StorageJmxOperations.class, 
STORAGE_SERVICE_OBJ_NAME)
+               .getParentRepairStatus(cmd);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public int repair(String keyspace, Map<String, String> repairOptions)

Review Comment:
   Add `@Override`
   
   CI should fail w/o the annotation. 



##########
server/src/main/java/org/apache/cassandra/sidecar/job/OperationalJob.java:
##########
@@ -100,11 +102,12 @@ public boolean isStale(long referenceTimestampInMillis, 
long ttlInMillis)
     }
 
     /**
-     * The concrete-job-specific implementation to determine if the job is 
running on the Cassandra node.
-     * @return true if the job is running on the Cassandra node. For example, 
node decommission is tracked by the
-     * operationMode exposed from Cassandra.
+     * The concrete-job-specific implementation to determine if the job has 
conflict with other operations on the same node.
+     *
+     * @param sameOperationJobs list of jobs being tracked by the tracker that 
have the same operation
+     * @return true if the job is has a conflict.
      */
-    public abstract boolean isRunningOnCassandra();
+    public abstract boolean hasConflict(@NotNull List<OperationalJob> 
sameOperationJobs);

Review Comment:
   No concrete implementation so far is using the parameter. Do we really need 
the list? 
   Besides that, API semantics is changed completely from 
`isRunningOnCassandra` to `hasConflict`. Maybe you want to declare a new 
interface method? 
   That being said, I am aware that the original `isRunningOnCassandra` was 
used for conflict detection or whether the job can be submitted. 



##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java:
##########
@@ -135,6 +135,24 @@ default void outOfRangeDataCleanup(@NotNull String 
keyspace, @NotNull String tab
      */
     String clusterName();
 
+    /**
+     * Triggers a repair operation for the given keyspace and options
+     *
+     * @param keyspace keyspace for the repair operation
+     * @param options  repair options
+     * @return an integer value representing the status of the repair operation
+     * which can be used as a reference to check for the status of the repair 
session via  {@link #getParentRepairStatus(int)}.
+     */
+    int repair(String keyspace, Map<String, String> options);

Review Comment:
   nit: rename to `repairAsync` to be explicit and consistent with Cassandra. 
   



##########
server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.adapters.base.RepairOptions;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.RepairJobsConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of {@link OperationalJob} to perform repair operation.
+ * <p>
+ * Cassandra repair operations are asynchronous and can run for extended 
periods. When a repair is
+ * detected to be IN_PROGRESS, this job sets its status to RUNNING and 
completes its promise.
+ * <p>
+ * The job's status can be checked at any time using the {@link #status()} 
method, which directly
+ * queries the current repair status from Cassandra, regardless of whether the 
promise has been completed.
+ * <p>
+ * This design ensures timely responses to clients while allowing accurate 
status reporting for
+ * long-running repair operations.
+ */
+public class RepairJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RepairJob.class);
+    private static final String OPERATION = "repair";
+    private static final String PREVIEW_KIND_REPAIRED = "REPAIRED";
+
+    private final RepairRequestParam repairParams;
+    private final RepairJobsConfiguration config;
+    private final TaskExecutorPool internalPool;
+    protected StorageOperations storageOperations;
+    private volatile OperationalJobStatus currentStatus;
+    private volatile int commandId = -1; // Store the command ID for status 
checks
+
+    /**
+     * Enum representing the status of a parent repair session
+     */
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
+    /**
+     * Constructor for creation of RepairJob
+     *
+     * @param taskExecutorPool    TaskExecutorPool instance (for testing)
+     * @param config              Repair job configuration
+     * @param jobId               UUID representing the Job to be created
+     * @param storageOps          Reference to the storage operations interface
+     * @param repairParams        Repair request parameters
+     */
+    public RepairJob(TaskExecutorPool taskExecutorPool,
+                     RepairJobsConfiguration config,
+                     UUID jobId,
+                     StorageOperations storageOps,
+                     RepairRequestParam repairParams)
+    {
+        super(jobId);
+        this.internalPool = taskExecutorPool;
+        this.config = config;
+        this.storageOperations = storageOps;
+        this.repairParams = repairParams;
+        this.currentStatus = OperationalJobStatus.CREATED;
+    }
+
+    /**
+     * {@inheritDoc}
+     * RepairJob allows parallel executions since multiple repairs can run 
concurrently
+     * on different tables or with different parameters.
+     */
+    @Override
+    public boolean hasConflict(@NotNull List<OperationalJob> sameOperationJobs)
+    {
+        // For now, we simply allow all repair jobs to run in parallel
+        // A more sophisticated implementation could check the repair 
parameters
+        // against currently running repairs to detect potential conflicts
+        return false;
+    }
+
+    @Override
+    protected Future<Void> executeInternal()
+    {
+        try
+        {
+            Map<String, String> options = 
generateRepairOptions(repairParams.requestPayload());
+            String keyspace = repairParams.keyspace().name();
+
+            LOGGER.info("Executing repair operation for keyspace {} jobId={}", 
keyspace, this.jobId());
+
+            try
+            {
+                commandId = storageOperations.repair(keyspace, options);
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Failed to initiate repair for keyspace {} 
jobId={}", keyspace, this.jobId(), e);
+                currentStatus = OperationalJobStatus.FAILED;
+                return Future.failedFuture(e);
+            }
+
+            if (commandId <= 0)
+            {
+                // When there are no relevant token ranges for the keyspace or 
RF is 1, the repair is inapplicable
+                LOGGER.info("Repair is not applicable for the provided options 
and keyspace '{}' jobId '{}'", keyspace, this.jobId());

Review Comment:
   This is the error message in cassandra
   ```
               String message = String.format("Replication factor is 1. No 
repair is needed for keyspace '%s'", repairCmd.keyspace);
   ```
   The message emitted by sidecar should be the same or similar to the nodetool 
output for operators' sake. 



##########
server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.adapters.base.RepairOptions;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.RepairJobsConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of {@link OperationalJob} to perform repair operation.
+ * <p>
+ * Cassandra repair operations are asynchronous and can run for extended 
periods. When a repair is
+ * detected to be IN_PROGRESS, this job sets its status to RUNNING and 
completes its promise.
+ * <p>
+ * The job's status can be checked at any time using the {@link #status()} 
method, which directly
+ * queries the current repair status from Cassandra, regardless of whether the 
promise has been completed.
+ * <p>
+ * This design ensures timely responses to clients while allowing accurate 
status reporting for
+ * long-running repair operations.
+ */
+public class RepairJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RepairJob.class);
+    private static final String OPERATION = "repair";
+    private static final String PREVIEW_KIND_REPAIRED = "REPAIRED";
+
+    private final RepairRequestParam repairParams;
+    private final RepairJobsConfiguration config;
+    private final TaskExecutorPool internalPool;
+    protected StorageOperations storageOperations;
+    private volatile OperationalJobStatus currentStatus;
+    private volatile int commandId = -1; // Store the command ID for status 
checks
+
+    /**
+     * Enum representing the status of a parent repair session
+     */
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
+    /**
+     * Constructor for creation of RepairJob
+     *
+     * @param taskExecutorPool    TaskExecutorPool instance (for testing)
+     * @param config              Repair job configuration
+     * @param jobId               UUID representing the Job to be created
+     * @param storageOps          Reference to the storage operations interface
+     * @param repairParams        Repair request parameters
+     */
+    public RepairJob(TaskExecutorPool taskExecutorPool,
+                     RepairJobsConfiguration config,
+                     UUID jobId,
+                     StorageOperations storageOps,
+                     RepairRequestParam repairParams)
+    {
+        super(jobId);
+        this.internalPool = taskExecutorPool;
+        this.config = config;
+        this.storageOperations = storageOps;
+        this.repairParams = repairParams;
+        this.currentStatus = OperationalJobStatus.CREATED;
+    }
+
+    /**
+     * {@inheritDoc}
+     * RepairJob allows parallel executions since multiple repairs can run 
concurrently
+     * on different tables or with different parameters.
+     */
+    @Override
+    public boolean hasConflict(@NotNull List<OperationalJob> sameOperationJobs)
+    {
+        // For now, we simply allow all repair jobs to run in parallel
+        // A more sophisticated implementation could check the repair 
parameters
+        // against currently running repairs to detect potential conflicts
+        return false;
+    }
+
+    @Override
+    protected Future<Void> executeInternal()
+    {
+        try
+        {
+            Map<String, String> options = 
generateRepairOptions(repairParams.requestPayload());
+            String keyspace = repairParams.keyspace().name();
+
+            LOGGER.info("Executing repair operation for keyspace {} jobId={}", 
keyspace, this.jobId());
+
+            try
+            {
+                commandId = storageOperations.repair(keyspace, options);
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Failed to initiate repair for keyspace {} 
jobId={}", keyspace, this.jobId(), e);
+                currentStatus = OperationalJobStatus.FAILED;
+                return Future.failedFuture(e);
+            }
+
+            if (commandId <= 0)
+            {
+                // When there are no relevant token ranges for the keyspace or 
RF is 1, the repair is inapplicable
+                LOGGER.info("Repair is not applicable for the provided options 
and keyspace '{}' jobId '{}'", keyspace, this.jobId());
+                currentStatus = OperationalJobStatus.SUCCEEDED;
+                return Future.succeededFuture();
+            }
+
+            // Create promise for job submission response - this completes 
when we have a definitive
+            // status to return to the client (either repair is running, 
completed, or failed)
+            final Promise<Void> jobSubmissionPromise = Promise.promise();
+            int maxAttempts = config.repairStatusMaxAttempts();
+            final AtomicInteger attemptCounter = new AtomicInteger(0);
+
+            // Periodic timer that checks for a valid repair status for a 
specified no. attempts to make a best-effort attempt
+            // to validate that the repair has been kicked-off before 
returning.
+            internalPool.setPeriodic(0, 
config.repairPollInterval().toIntMillis(), id -> {
+                try
+                {
+                    int currentAttempt = attemptCounter.incrementAndGet();
+                    if (currentAttempt > maxAttempts)
+                    {
+                        internalPool.cancelTimer(id);
+                        String msg = String.format("Failed to obtain repair 
status after %d attempts.", maxAttempts);
+                        LOGGER.warn(msg);
+                        // Set status to RUNNING and complete the promise to 
ensure the job is properly handled
+                        currentStatus = OperationalJobStatus.RUNNING;
+                        jobSubmissionPromise.tryComplete();
+                        return;
+                    }
+
+                    // Check the repair status
+                    List<String> status;
+                    try
+                    {
+                        status = 
storageOperations.getParentRepairStatus(commandId);
+                    }
+                    catch (Exception e)
+                    {
+                        LOGGER.warn("Failed to get repair status for cmd: {} 
(attempt {}/{})",
+                                    commandId, currentAttempt, maxAttempts, e);
+                        // Continue polling on exception
+                        return;
+                    }
+
+                    // If status is empty, continue polling
+                    if (status == null || status.isEmpty())
+                    {
+                        LOGGER.debug("No parent repair session status found 
for cmd: {} - repair may be initializing (attempt {}/{})",
+                                     commandId, currentAttempt, maxAttempts);
+                        return;
+                    }
+
+                    internalPool.cancelTimer(id);
+                    updateRepairJobStatus(jobSubmissionPromise, status);
+                }
+                catch (Exception e)
+                {
+                    LOGGER.error("Unexpected error in repair status check", e);
+                    internalPool.cancelTimer(id);
+                    currentStatus = OperationalJobStatus.FAILED;
+                    jobSubmissionPromise.tryFail(e);
+                }
+            });
+
+            return jobSubmissionPromise.future();
+        }
+        catch (Exception e)
+        {
+            // Catch any exceptions in the overall method
+            LOGGER.error("Failed to execute repair job", e);
+            currentStatus = OperationalJobStatus.FAILED;
+            return Future.failedFuture(e);
+        }
+    }
+
+
+    @Override
+    public OperationalJobStatus status()
+    {
+        // If we have a valid command ID, check the actual repair status
+        if (commandId > 0)
+        {
+            List<String> status = 
storageOperations.getParentRepairStatus(commandId);
+            if (status != null && !status.isEmpty())
+            {
+                try
+                {
+                    ParentRepairStatus parentRepairStatus = 
ParentRepairStatus.valueOf(status.get(0));
+                    switch (parentRepairStatus)
+                    {
+                        case COMPLETED:
+                            return OperationalJobStatus.SUCCEEDED;
+                        case FAILED:
+                            return OperationalJobStatus.FAILED;
+                        case IN_PROGRESS:
+                            return OperationalJobStatus.RUNNING;
+                        default:
+                            LOGGER.warn("Encountered unexpected repair status: 
{}", parentRepairStatus);
+                            // Don't update currentStatus here, fall back to 
parent implementation
+                    }
+                }
+                catch (IllegalArgumentException e)
+                {
+                    LOGGER.warn("Invalid parent repair status: {}", 
status.get(0), e);
+                    // Don't update currentStatus here, fall back to parent 
implementation
+                }
+            }
+        }
+        
+        // If we have a current status, return it
+        if (currentStatus != null)
+        {
+            return currentStatus;
+        }
+        
+        // Otherwise, fall back to the parent implementation
+        return super.status();
+    }
+
+    /**
+     * Updates the repair job status based on the parent repair status from 
Cassandra.
+     * <p>
+     * When the parent repair status is IN_PROGRESS, this method sets the 
currentStatus
+     * to RUNNING and completes the promise
+     * <p>
+     * This approach ensures that resources are properly managed while still 
providing
+     * accurate status reporting through the {@link #status()} method.
+     *
+     * @param jobSubmissionPromise the promise to complete
+     * @param status the parent repair status from Cassandra
+     */
+    private void updateRepairJobStatus(Promise<Void> jobSubmissionPromise, 
List<String> status)
+    {
+        ParentRepairStatus parentRepairStatus = 
ParentRepairStatus.valueOf(status.get(0));
+        List<String> messages = status.subList(1, status.size());
+
+        LOGGER.info("Parent repair session {} has {} status. Messages: {}",
+                    parentRepairStatus.name().toLowerCase(),
+                    parentRepairStatus,
+                    String.join("\n", messages));
+        switch (parentRepairStatus)
+        {
+            case COMPLETED:
+                currentStatus = OperationalJobStatus.SUCCEEDED;
+                jobSubmissionPromise.tryComplete();
+                break;
+            case FAILED:
+                currentStatus = OperationalJobStatus.FAILED;
+                String reason = !messages.isEmpty() ? messages.get(0) :
+                                "Repair failed with no error message";
+                jobSubmissionPromise.tryFail(new IOException(reason));
+                break;
+            case IN_PROGRESS:
+                currentStatus = OperationalJobStatus.RUNNING;
+                jobSubmissionPromise.tryComplete();
+                break;
+            default:
+                String message = String.format("Encountered unexpected repair 
status: %s Messages: %s",
+                                               parentRepairStatus, 
String.join("\n", messages));
+                LOGGER.error(message);
+                currentStatus = OperationalJobStatus.FAILED;
+                jobSubmissionPromise.tryFail(message);
+                break;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String name()
+    {
+        return OPERATION;
+    }
+
+    private Map<String, String> generateRepairOptions(RepairPayload 
repairPayload)
+    {
+        Map<String, String> options = new HashMap<>();
+
+        List<String> tables = repairPayload.tables();
+        if (tables != null && !tables.isEmpty())
+        {
+            options.put(RepairOptions.COLUMNFAMILIES.optionName(), 
String.join(",", tables));
+        }
+
+        Boolean isPrimaryRange = repairPayload.isPrimaryRange();
+        if (isPrimaryRange != null)
+        {
+            options.put(RepairOptions.PRIMARY_RANGE.optionName(), 
String.valueOf(isPrimaryRange));
+        }
+        // TODO: Verify use-cases involving multiple DCs
+
+        String dc = repairPayload.datacenter();
+        if (dc != null)
+        {
+            options.put(RepairOptions.DATACENTERS.optionName(), dc);
+        }
+
+        List<String> hosts = repairPayload.hosts();
+        if (hosts != null && !hosts.isEmpty())
+        {
+            options.put(RepairOptions.HOSTS.optionName(), String.join(",", 
hosts));
+        }
+
+        if (repairPayload.startToken() != null && repairPayload.endToken() != 
null)
+        {
+            // Use original string values for range construction
+            options.put(RepairOptions.RANGES.optionName(), 
repairPayload.startToken() + ":" + repairPayload.endToken());
+        }
+
+        if (repairPayload.repairType() == RepairPayload.RepairType.INCREMENTAL)
+        {
+            options.put(RepairOptions.INCREMENTAL.optionName(), 
Boolean.TRUE.toString());
+        }
+
+        if (repairPayload.force() != null)
+        {

Review Comment:
   Check `repairPayload.force() == true` too. Or 
`Boolean.TRUE.Boolean.TRUE.equals(repairPayload.force())` which covers null and 
value comparison.



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/RepairHandler.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.Set;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.job.RepairJob;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.OperationalJobUtils;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for triggering repair
+ */
+public class RepairHandler extends AbstractHandler<RepairRequestParam> 
implements AccessProtected
+{
+    private final ServiceConfiguration config;
+    private final OperationalJobManager jobManager;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher      the metadata fetcher
+     * @param executorPools        executor pools for blocking executions
+     * @param serviceConfiguration configuration object holding config details 
of Sidecar
+     * @param validator            a validator instance to validate 
Cassandra-specific input
+     * @param jobManager           manager for long-running operational jobs
+     */
+    @Inject
+    protected RepairHandler(InstanceMetadataFetcher metadataFetcher,
+                            ExecutorPools executorPools,
+                            ServiceConfiguration serviceConfiguration,
+                            CassandraInputValidator validator,
+                            OperationalJobManager jobManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.jobManager = jobManager;
+        this.config = serviceConfiguration;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected RepairRequestParam extractParamsOrThrow(RoutingContext context)
+    {
+        Name keyspace = keyspace(context, true);
+        if (keyspace == null)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
"'keyspace' is required but not supplied");
+        }
+
+        String bodyString = context.body().asString();
+        if (bodyString == null || bodyString.equalsIgnoreCase("null")) // json 
encoder writes null as "null"
+        {
+            logger.warn("Bad request to create repair job. Received null 
payload.");
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
"Unexpected null payload for request");
+        }
+
+        RepairPayload payload;
+        try
+        {
+            payload = Json.decodeValue(bodyString, RepairPayload.class);
+        }
+        catch (DecodeException decodeException)
+        {
+            logger.warn("Bad request to create repair job. Received invalid 
JSON payload.");
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                                    "Invalid request payload",
+                                    decodeException);
+        }
+
+        // Validate token range if provided
+        validateTokenRange(payload);
+
+        return RepairRequestParam.from(keyspace, payload);
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  @NotNull String host,
+                                  SocketAddress remoteAddress,
+                                  RepairRequestParam repairRequestParam)
+    {
+        StorageOperations operations = 
metadataFetcher.delegate(host).storageOperations();
+        RepairJob job = new RepairJob(executorPools.internal(), 
config.repairConfiguration(), UUIDs.timeBased(), operations, 
repairRequestParam);
+
+        this.jobManager.trySubmitJob(job,
+                                     (completedJob, exception) ->
+                                     
OperationalJobUtils.sendStatusBasedResponse(context, completedJob, exception),
+                                     executorPools.service(),
+                                     
config.operationalJobExecutionMaxWaitTime());
+    }
+
+    /**
+     * Validates the token range in the repair payload if both start and end 
tokens are provided.
+     * 
+     * @param payload the repair payload to validate
+     * @throws HttpException if the token range is invalid
+     */
+    private void validateTokenRange(RepairPayload payload)
+    {
+        if (payload.startToken() != null && payload.endToken() != null)
+        {
+            try
+            {
+                String startTokenStr = payload.startToken();
+                String endTokenStr = payload.endToken();
+                
+                // Validate tokens using BigInteger for proper numeric 
comparison
+                BigInteger startToken = new BigInteger(startTokenStr);
+                BigInteger endToken = new BigInteger(endTokenStr);
+                
+                if (startToken.compareTo(endToken) >= 0)
+                {
+                    throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                        "Start token must be less than end token. Got start: " 
+ 
+                        startTokenStr + ", end: " + endTokenStr);
+                }
+            }
+            catch (NumberFormatException e)
+            {
+                throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                    "Invalid token format. Tokens must be numeric values.", e);
+            }

Review Comment:
   align the parameters. 



##########
server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.adapters.base.RepairOptions;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.RepairJobsConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of {@link OperationalJob} to perform repair operation.
+ * <p>
+ * Cassandra repair operations are asynchronous and can run for extended 
periods. When a repair is
+ * detected to be IN_PROGRESS, this job sets its status to RUNNING and 
completes its promise.
+ * <p>
+ * The job's status can be checked at any time using the {@link #status()} 
method, which directly
+ * queries the current repair status from Cassandra, regardless of whether the 
promise has been completed.
+ * <p>
+ * This design ensures timely responses to clients while allowing accurate 
status reporting for
+ * long-running repair operations.
+ */
+public class RepairJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RepairJob.class);
+    private static final String OPERATION = "repair";
+    private static final String PREVIEW_KIND_REPAIRED = "REPAIRED";
+
+    private final RepairRequestParam repairParams;
+    private final RepairJobsConfiguration config;
+    private final TaskExecutorPool internalPool;
+    protected StorageOperations storageOperations;
+    private volatile OperationalJobStatus currentStatus;
+    private volatile int commandId = -1; // Store the command ID for status 
checks
+
+    /**
+     * Enum representing the status of a parent repair session
+     */
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
+    /**
+     * Constructor for creation of RepairJob
+     *
+     * @param taskExecutorPool    TaskExecutorPool instance (for testing)
+     * @param config              Repair job configuration
+     * @param jobId               UUID representing the Job to be created
+     * @param storageOps          Reference to the storage operations interface
+     * @param repairParams        Repair request parameters
+     */
+    public RepairJob(TaskExecutorPool taskExecutorPool,
+                     RepairJobsConfiguration config,
+                     UUID jobId,
+                     StorageOperations storageOps,
+                     RepairRequestParam repairParams)
+    {
+        super(jobId);
+        this.internalPool = taskExecutorPool;
+        this.config = config;
+        this.storageOperations = storageOps;
+        this.repairParams = repairParams;
+        this.currentStatus = OperationalJobStatus.CREATED;
+    }
+
+    /**
+     * {@inheritDoc}
+     * RepairJob allows parallel executions since multiple repairs can run 
concurrently
+     * on different tables or with different parameters.
+     */
+    @Override
+    public boolean hasConflict(@NotNull List<OperationalJob> sameOperationJobs)
+    {
+        // For now, we simply allow all repair jobs to run in parallel
+        // A more sophisticated implementation could check the repair 
parameters
+        // against currently running repairs to detect potential conflicts
+        return false;
+    }
+
+    @Override
+    protected Future<Void> executeInternal()
+    {
+        try
+        {
+            Map<String, String> options = 
generateRepairOptions(repairParams.requestPayload());
+            String keyspace = repairParams.keyspace().name();
+
+            LOGGER.info("Executing repair operation for keyspace {} jobId={}", 
keyspace, this.jobId());
+
+            try
+            {
+                commandId = storageOperations.repair(keyspace, options);
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Failed to initiate repair for keyspace {} 
jobId={}", keyspace, this.jobId(), e);
+                currentStatus = OperationalJobStatus.FAILED;
+                return Future.failedFuture(e);
+            }
+
+            if (commandId <= 0)
+            {
+                // When there are no relevant token ranges for the keyspace or 
RF is 1, the repair is inapplicable
+                LOGGER.info("Repair is not applicable for the provided options 
and keyspace '{}' jobId '{}'", keyspace, this.jobId());
+                currentStatus = OperationalJobStatus.SUCCEEDED;
+                return Future.succeededFuture();
+            }
+
+            // Create promise for job submission response - this completes 
when we have a definitive
+            // status to return to the client (either repair is running, 
completed, or failed)
+            final Promise<Void> jobSubmissionPromise = Promise.promise();
+            int maxAttempts = config.repairStatusMaxAttempts();
+            final AtomicInteger attemptCounter = new AtomicInteger(0);
+
+            // Periodic timer that checks for a valid repair status for a 
specified no. attempts to make a best-effort attempt
+            // to validate that the repair has been kicked-off before 
returning.
+            internalPool.setPeriodic(0, 
config.repairPollInterval().toIntMillis(), id -> {
+                try
+                {
+                    int currentAttempt = attemptCounter.incrementAndGet();
+                    if (currentAttempt > maxAttempts)
+                    {
+                        internalPool.cancelTimer(id);
+                        String msg = String.format("Failed to obtain repair 
status after %d attempts.", maxAttempts);
+                        LOGGER.warn(msg);

Review Comment:
   merge those 2 lines. It is too expensive to format the string in advance. 



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/RepairHandler.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.Set;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.job.RepairJob;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.OperationalJobUtils;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for triggering repair
+ */
+public class RepairHandler extends AbstractHandler<RepairRequestParam> 
implements AccessProtected
+{
+    private final ServiceConfiguration config;
+    private final OperationalJobManager jobManager;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher      the metadata fetcher
+     * @param executorPools        executor pools for blocking executions
+     * @param serviceConfiguration configuration object holding config details 
of Sidecar
+     * @param validator            a validator instance to validate 
Cassandra-specific input
+     * @param jobManager           manager for long-running operational jobs
+     */
+    @Inject
+    protected RepairHandler(InstanceMetadataFetcher metadataFetcher,
+                            ExecutorPools executorPools,
+                            ServiceConfiguration serviceConfiguration,
+                            CassandraInputValidator validator,
+                            OperationalJobManager jobManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.jobManager = jobManager;
+        this.config = serviceConfiguration;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected RepairRequestParam extractParamsOrThrow(RoutingContext context)
+    {
+        Name keyspace = keyspace(context, true);
+        if (keyspace == null)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
"'keyspace' is required but not supplied");
+        }
+
+        String bodyString = context.body().asString();
+        if (bodyString == null || bodyString.equalsIgnoreCase("null")) // json 
encoder writes null as "null"

Review Comment:
   >  json encoder writes null as "null"
   
   This is interesting. Is the "null" string case tested? I can see 
`testRepairHandlerBadRequest`, the `bodyString` is `null`, not `"null"` string.



##########
server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.adapters.base.RepairOptions;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.RepairJobsConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of {@link OperationalJob} to perform repair operation.
+ * <p>
+ * Cassandra repair operations are asynchronous and can run for extended 
periods. When a repair is
+ * detected to be IN_PROGRESS, this job sets its status to RUNNING and 
completes its promise.
+ * <p>
+ * The job's status can be checked at any time using the {@link #status()} 
method, which directly
+ * queries the current repair status from Cassandra, regardless of whether the 
promise has been completed.
+ * <p>
+ * This design ensures timely responses to clients while allowing accurate 
status reporting for
+ * long-running repair operations.
+ */
+public class RepairJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RepairJob.class);
+    private static final String OPERATION = "repair";
+    private static final String PREVIEW_KIND_REPAIRED = "REPAIRED";
+
+    private final RepairRequestParam repairParams;
+    private final RepairJobsConfiguration config;
+    private final TaskExecutorPool internalPool;
+    protected StorageOperations storageOperations;
+    private volatile OperationalJobStatus currentStatus;
+    private volatile int commandId = -1; // Store the command ID for status 
checks
+
+    /**
+     * Enum representing the status of a parent repair session
+     */
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
+    /**
+     * Constructor for creation of RepairJob
+     *
+     * @param taskExecutorPool    TaskExecutorPool instance (for testing)
+     * @param config              Repair job configuration
+     * @param jobId               UUID representing the Job to be created
+     * @param storageOps          Reference to the storage operations interface
+     * @param repairParams        Repair request parameters
+     */
+    public RepairJob(TaskExecutorPool taskExecutorPool,
+                     RepairJobsConfiguration config,
+                     UUID jobId,
+                     StorageOperations storageOps,
+                     RepairRequestParam repairParams)
+    {
+        super(jobId);
+        this.internalPool = taskExecutorPool;
+        this.config = config;
+        this.storageOperations = storageOps;
+        this.repairParams = repairParams;
+        this.currentStatus = OperationalJobStatus.CREATED;
+    }
+
+    /**
+     * {@inheritDoc}
+     * RepairJob allows parallel executions since multiple repairs can run 
concurrently
+     * on different tables or with different parameters.
+     */
+    @Override
+    public boolean hasConflict(@NotNull List<OperationalJob> sameOperationJobs)
+    {
+        // For now, we simply allow all repair jobs to run in parallel
+        // A more sophisticated implementation could check the repair 
parameters
+        // against currently running repairs to detect potential conflicts
+        return false;
+    }
+
+    @Override
+    protected Future<Void> executeInternal()
+    {
+        try
+        {
+            Map<String, String> options = 
generateRepairOptions(repairParams.requestPayload());
+            String keyspace = repairParams.keyspace().name();
+
+            LOGGER.info("Executing repair operation for keyspace {} jobId={}", 
keyspace, this.jobId());
+
+            try
+            {
+                commandId = storageOperations.repair(keyspace, options);
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Failed to initiate repair for keyspace {} 
jobId={}", keyspace, this.jobId(), e);
+                currentStatus = OperationalJobStatus.FAILED;
+                return Future.failedFuture(e);
+            }
+
+            if (commandId <= 0)
+            {
+                // When there are no relevant token ranges for the keyspace or 
RF is 1, the repair is inapplicable
+                LOGGER.info("Repair is not applicable for the provided options 
and keyspace '{}' jobId '{}'", keyspace, this.jobId());
+                currentStatus = OperationalJobStatus.SUCCEEDED;
+                return Future.succeededFuture();
+            }
+
+            // Create promise for job submission response - this completes 
when we have a definitive
+            // status to return to the client (either repair is running, 
completed, or failed)
+            final Promise<Void> jobSubmissionPromise = Promise.promise();
+            int maxAttempts = config.repairStatusMaxAttempts();
+            final AtomicInteger attemptCounter = new AtomicInteger(0);
+
+            // Periodic timer that checks for a valid repair status for a 
specified no. attempts to make a best-effort attempt
+            // to validate that the repair has been kicked-off before 
returning.
+            internalPool.setPeriodic(0, 
config.repairPollInterval().toIntMillis(), id -> {
+                try
+                {
+                    int currentAttempt = attemptCounter.incrementAndGet();
+                    if (currentAttempt > maxAttempts)
+                    {
+                        internalPool.cancelTimer(id);
+                        String msg = String.format("Failed to obtain repair 
status after %d attempts.", maxAttempts);
+                        LOGGER.warn(msg);
+                        // Set status to RUNNING and complete the promise to 
ensure the job is properly handled
+                        currentStatus = OperationalJobStatus.RUNNING;
+                        jobSubmissionPromise.tryComplete();
+                        return;
+                    }
+
+                    // Check the repair status
+                    List<String> status;
+                    try
+                    {
+                        status = 
storageOperations.getParentRepairStatus(commandId);
+                    }
+                    catch (Exception e)
+                    {
+                        LOGGER.warn("Failed to get repair status for cmd: {} 
(attempt {}/{})",
+                                    commandId, currentAttempt, maxAttempts, e);
+                        // Continue polling on exception
+                        return;
+                    }
+
+                    // If status is empty, continue polling
+                    if (status == null || status.isEmpty())
+                    {
+                        LOGGER.debug("No parent repair session status found 
for cmd: {} - repair may be initializing (attempt {}/{})",
+                                     commandId, currentAttempt, maxAttempts);
+                        return;
+                    }
+
+                    internalPool.cancelTimer(id);
+                    updateRepairJobStatus(jobSubmissionPromise, status);
+                }
+                catch (Exception e)
+                {
+                    LOGGER.error("Unexpected error in repair status check", e);
+                    internalPool.cancelTimer(id);
+                    currentStatus = OperationalJobStatus.FAILED;
+                    jobSubmissionPromise.tryFail(e);
+                }
+            });
+
+            return jobSubmissionPromise.future();
+        }
+        catch (Exception e)
+        {
+            // Catch any exceptions in the overall method
+            LOGGER.error("Failed to execute repair job", e);
+            currentStatus = OperationalJobStatus.FAILED;
+            return Future.failedFuture(e);
+        }
+    }
+
+
+    @Override
+    public OperationalJobStatus status()
+    {
+        // If we have a valid command ID, check the actual repair status
+        if (commandId > 0)
+        {
+            List<String> status = 
storageOperations.getParentRepairStatus(commandId);
+            if (status != null && !status.isEmpty())
+            {
+                try
+                {
+                    ParentRepairStatus parentRepairStatus = 
ParentRepairStatus.valueOf(status.get(0));
+                    switch (parentRepairStatus)
+                    {
+                        case COMPLETED:
+                            return OperationalJobStatus.SUCCEEDED;
+                        case FAILED:
+                            return OperationalJobStatus.FAILED;
+                        case IN_PROGRESS:
+                            return OperationalJobStatus.RUNNING;
+                        default:
+                            LOGGER.warn("Encountered unexpected repair status: 
{}", parentRepairStatus);
+                            // Don't update currentStatus here, fall back to 
parent implementation
+                    }
+                }
+                catch (IllegalArgumentException e)
+                {
+                    LOGGER.warn("Invalid parent repair status: {}", 
status.get(0), e);
+                    // Don't update currentStatus here, fall back to parent 
implementation
+                }
+            }
+        }
+        
+        // If we have a current status, return it
+        if (currentStatus != null)
+        {
+            return currentStatus;
+        }
+        
+        // Otherwise, fall back to the parent implementation
+        return super.status();
+    }
+
+    /**
+     * Updates the repair job status based on the parent repair status from 
Cassandra.
+     * <p>
+     * When the parent repair status is IN_PROGRESS, this method sets the 
currentStatus
+     * to RUNNING and completes the promise
+     * <p>
+     * This approach ensures that resources are properly managed while still 
providing
+     * accurate status reporting through the {@link #status()} method.
+     *
+     * @param jobSubmissionPromise the promise to complete
+     * @param status the parent repair status from Cassandra
+     */
+    private void updateRepairJobStatus(Promise<Void> jobSubmissionPromise, 
List<String> status)
+    {
+        ParentRepairStatus parentRepairStatus = 
ParentRepairStatus.valueOf(status.get(0));
+        List<String> messages = status.subList(1, status.size());
+
+        LOGGER.info("Parent repair session {} has {} status. Messages: {}",
+                    parentRepairStatus.name().toLowerCase(),
+                    parentRepairStatus,
+                    String.join("\n", messages));
+        switch (parentRepairStatus)
+        {
+            case COMPLETED:
+                currentStatus = OperationalJobStatus.SUCCEEDED;
+                jobSubmissionPromise.tryComplete();
+                break;
+            case FAILED:
+                currentStatus = OperationalJobStatus.FAILED;
+                String reason = !messages.isEmpty() ? messages.get(0) :
+                                "Repair failed with no error message";
+                jobSubmissionPromise.tryFail(new IOException(reason));
+                break;
+            case IN_PROGRESS:
+                currentStatus = OperationalJobStatus.RUNNING;
+                jobSubmissionPromise.tryComplete();
+                break;
+            default:
+                String message = String.format("Encountered unexpected repair 
status: %s Messages: %s",
+                                               parentRepairStatus, 
String.join("\n", messages));
+                LOGGER.error(message);
+                currentStatus = OperationalJobStatus.FAILED;
+                jobSubmissionPromise.tryFail(message);
+                break;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String name()
+    {
+        return OPERATION;
+    }
+
+    private Map<String, String> generateRepairOptions(RepairPayload 
repairPayload)
+    {
+        Map<String, String> options = new HashMap<>();
+
+        List<String> tables = repairPayload.tables();
+        if (tables != null && !tables.isEmpty())
+        {
+            options.put(RepairOptions.COLUMNFAMILIES.optionName(), 
String.join(",", tables));
+        }
+
+        Boolean isPrimaryRange = repairPayload.isPrimaryRange();
+        if (isPrimaryRange != null)
+        {
+            options.put(RepairOptions.PRIMARY_RANGE.optionName(), 
String.valueOf(isPrimaryRange));
+        }
+        // TODO: Verify use-cases involving multiple DCs
+
+        String dc = repairPayload.datacenter();
+        if (dc != null)
+        {
+            options.put(RepairOptions.DATACENTERS.optionName(), dc);
+        }
+
+        List<String> hosts = repairPayload.hosts();
+        if (hosts != null && !hosts.isEmpty())
+        {
+            options.put(RepairOptions.HOSTS.optionName(), String.join(",", 
hosts));
+        }
+
+        if (repairPayload.startToken() != null && repairPayload.endToken() != 
null)
+        {
+            // Use original string values for range construction
+            options.put(RepairOptions.RANGES.optionName(), 
repairPayload.startToken() + ":" + repairPayload.endToken());
+        }
+
+        if (repairPayload.repairType() == RepairPayload.RepairType.INCREMENTAL)
+        {
+            options.put(RepairOptions.INCREMENTAL.optionName(), 
Boolean.TRUE.toString());
+        }
+
+        if (repairPayload.force() != null)
+        {
+            options.put(RepairOptions.FORCE_REPAIR.optionName(), 
String.valueOf(repairPayload.force()));
+        }
+
+        if (repairPayload.shouldValidate() != null)

Review Comment:
   The condition should check `repairPayload.shouldValidate == true` too. 
Otherwise, preview repair is enabled even if `shouldValidate == false`.



##########
server/src/main/java/org/apache/cassandra/sidecar/handlers/RepairHandler.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.Set;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.json.DecodeException;
+import io.vertx.core.json.Json;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.common.server.data.Name;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.apache.cassandra.sidecar.job.OperationalJobManager;
+import org.apache.cassandra.sidecar.job.RepairJob;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.OperationalJobUtils;
+import org.jetbrains.annotations.NotNull;
+
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Handler for triggering repair
+ */
+public class RepairHandler extends AbstractHandler<RepairRequestParam> 
implements AccessProtected
+{
+    private final ServiceConfiguration config;
+    private final OperationalJobManager jobManager;
+
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher      the metadata fetcher
+     * @param executorPools        executor pools for blocking executions
+     * @param serviceConfiguration configuration object holding config details 
of Sidecar
+     * @param validator            a validator instance to validate 
Cassandra-specific input
+     * @param jobManager           manager for long-running operational jobs
+     */
+    @Inject
+    protected RepairHandler(InstanceMetadataFetcher metadataFetcher,
+                            ExecutorPools executorPools,
+                            ServiceConfiguration serviceConfiguration,
+                            CassandraInputValidator validator,
+                            OperationalJobManager jobManager)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.jobManager = jobManager;
+        this.config = serviceConfiguration;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected RepairRequestParam extractParamsOrThrow(RoutingContext context)
+    {
+        Name keyspace = keyspace(context, true);
+        if (keyspace == null)
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
"'keyspace' is required but not supplied");
+        }
+
+        String bodyString = context.body().asString();
+        if (bodyString == null || bodyString.equalsIgnoreCase("null")) // json 
encoder writes null as "null"
+        {
+            logger.warn("Bad request to create repair job. Received null 
payload.");
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
"Unexpected null payload for request");
+        }
+
+        RepairPayload payload;
+        try
+        {
+            payload = Json.decodeValue(bodyString, RepairPayload.class);
+        }
+        catch (DecodeException decodeException)
+        {
+            logger.warn("Bad request to create repair job. Received invalid 
JSON payload.");
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                                    "Invalid request payload",
+                                    decodeException);
+        }

Review Comment:
   nit: equivalent, but slightly less verbose. 
   
   ```suggestion
           RepairPayload payload;
           try
           {
               payload = context.body().asPojo(RepairPayload.class);
           }
           catch (DecodeException decodeException)
           {
               logger.warn("Bad request to create repair job. Received invalid 
JSON payload.");
               throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
                                       "Invalid request payload",
                                       decodeException);
           }
   
           if (payload == null)
           {
               logger.warn("Bad request to create repair job. Received null 
payload.");
               throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
"Unexpected null payload for request");
           }
   ```



##########
server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.adapters.base.RepairOptions;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.RepairJobsConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of {@link OperationalJob} to perform repair operation.
+ * <p>
+ * Cassandra repair operations are asynchronous and can run for extended 
periods. When a repair is
+ * detected to be IN_PROGRESS, this job sets its status to RUNNING and 
completes its promise.
+ * <p>
+ * The job's status can be checked at any time using the {@link #status()} 
method, which directly
+ * queries the current repair status from Cassandra, regardless of whether the 
promise has been completed.
+ * <p>
+ * This design ensures timely responses to clients while allowing accurate 
status reporting for
+ * long-running repair operations.
+ */
+public class RepairJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RepairJob.class);
+    private static final String OPERATION = "repair";
+    private static final String PREVIEW_KIND_REPAIRED = "REPAIRED";
+
+    private final RepairRequestParam repairParams;
+    private final RepairJobsConfiguration config;
+    private final TaskExecutorPool internalPool;
+    protected StorageOperations storageOperations;
+    private volatile OperationalJobStatus currentStatus;
+    private volatile int commandId = -1; // Store the command ID for status 
checks
+
+    /**
+     * Enum representing the status of a parent repair session
+     */
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
+    /**
+     * Constructor for creation of RepairJob
+     *
+     * @param taskExecutorPool    TaskExecutorPool instance (for testing)
+     * @param config              Repair job configuration
+     * @param jobId               UUID representing the Job to be created
+     * @param storageOps          Reference to the storage operations interface
+     * @param repairParams        Repair request parameters
+     */
+    public RepairJob(TaskExecutorPool taskExecutorPool,
+                     RepairJobsConfiguration config,
+                     UUID jobId,
+                     StorageOperations storageOps,
+                     RepairRequestParam repairParams)
+    {
+        super(jobId);
+        this.internalPool = taskExecutorPool;
+        this.config = config;
+        this.storageOperations = storageOps;
+        this.repairParams = repairParams;
+        this.currentStatus = OperationalJobStatus.CREATED;
+    }
+
+    /**
+     * {@inheritDoc}
+     * RepairJob allows parallel executions since multiple repairs can run 
concurrently
+     * on different tables or with different parameters.
+     */
+    @Override
+    public boolean hasConflict(@NotNull List<OperationalJob> sameOperationJobs)
+    {
+        // For now, we simply allow all repair jobs to run in parallel
+        // A more sophisticated implementation could check the repair 
parameters
+        // against currently running repairs to detect potential conflicts
+        return false;
+    }
+
+    @Override
+    protected Future<Void> executeInternal()
+    {
+        try
+        {
+            Map<String, String> options = 
generateRepairOptions(repairParams.requestPayload());
+            String keyspace = repairParams.keyspace().name();
+
+            LOGGER.info("Executing repair operation for keyspace {} jobId={}", 
keyspace, this.jobId());
+
+            try
+            {
+                commandId = storageOperations.repair(keyspace, options);
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Failed to initiate repair for keyspace {} 
jobId={}", keyspace, this.jobId(), e);
+                currentStatus = OperationalJobStatus.FAILED;
+                return Future.failedFuture(e);
+            }
+
+            if (commandId <= 0)
+            {
+                // When there are no relevant token ranges for the keyspace or 
RF is 1, the repair is inapplicable
+                LOGGER.info("Repair is not applicable for the provided options 
and keyspace '{}' jobId '{}'", keyspace, this.jobId());
+                currentStatus = OperationalJobStatus.SUCCEEDED;
+                return Future.succeededFuture();
+            }
+
+            // Create promise for job submission response - this completes 
when we have a definitive
+            // status to return to the client (either repair is running, 
completed, or failed)
+            final Promise<Void> jobSubmissionPromise = Promise.promise();
+            int maxAttempts = config.repairStatusMaxAttempts();
+            final AtomicInteger attemptCounter = new AtomicInteger(0);
+
+            // Periodic timer that checks for a valid repair status for a 
specified no. attempts to make a best-effort attempt
+            // to validate that the repair has been kicked-off before 
returning.
+            internalPool.setPeriodic(0, 
config.repairPollInterval().toIntMillis(), id -> {
+                try
+                {
+                    int currentAttempt = attemptCounter.incrementAndGet();
+                    if (currentAttempt > maxAttempts)
+                    {
+                        internalPool.cancelTimer(id);
+                        String msg = String.format("Failed to obtain repair 
status after %d attempts.", maxAttempts);
+                        LOGGER.warn(msg);
+                        // Set status to RUNNING and complete the promise to 
ensure the job is properly handled
+                        currentStatus = OperationalJobStatus.RUNNING;
+                        jobSubmissionPromise.tryComplete();
+                        return;
+                    }
+
+                    // Check the repair status
+                    List<String> status;
+                    try
+                    {
+                        status = 
storageOperations.getParentRepairStatus(commandId);
+                    }
+                    catch (Exception e)
+                    {
+                        LOGGER.warn("Failed to get repair status for cmd: {} 
(attempt {}/{})",
+                                    commandId, currentAttempt, maxAttempts, e);
+                        // Continue polling on exception
+                        return;
+                    }
+
+                    // If status is empty, continue polling
+                    if (status == null || status.isEmpty())
+                    {
+                        LOGGER.debug("No parent repair session status found 
for cmd: {} - repair may be initializing (attempt {}/{})",
+                                     commandId, currentAttempt, maxAttempts);
+                        return;
+                    }
+
+                    internalPool.cancelTimer(id);
+                    updateRepairJobStatus(jobSubmissionPromise, status);
+                }
+                catch (Exception e)
+                {
+                    LOGGER.error("Unexpected error in repair status check", e);
+                    internalPool.cancelTimer(id);
+                    currentStatus = OperationalJobStatus.FAILED;
+                    jobSubmissionPromise.tryFail(e);
+                }
+            });
+
+            return jobSubmissionPromise.future();
+        }
+        catch (Exception e)
+        {
+            // Catch any exceptions in the overall method
+            LOGGER.error("Failed to execute repair job", e);
+            currentStatus = OperationalJobStatus.FAILED;
+            return Future.failedFuture(e);
+        }
+    }
+
+
+    @Override
+    public OperationalJobStatus status()
+    {
+        // If we have a valid command ID, check the actual repair status
+        if (commandId > 0)
+        {
+            List<String> status = 
storageOperations.getParentRepairStatus(commandId);
+            if (status != null && !status.isEmpty())
+            {
+                try
+                {
+                    ParentRepairStatus parentRepairStatus = 
ParentRepairStatus.valueOf(status.get(0));
+                    switch (parentRepairStatus)
+                    {
+                        case COMPLETED:
+                            return OperationalJobStatus.SUCCEEDED;
+                        case FAILED:
+                            return OperationalJobStatus.FAILED;
+                        case IN_PROGRESS:
+                            return OperationalJobStatus.RUNNING;
+                        default:
+                            LOGGER.warn("Encountered unexpected repair status: 
{}", parentRepairStatus);
+                            // Don't update currentStatus here, fall back to 
parent implementation
+                    }
+                }
+                catch (IllegalArgumentException e)
+                {
+                    LOGGER.warn("Invalid parent repair status: {}", 
status.get(0), e);
+                    // Don't update currentStatus here, fall back to parent 
implementation
+                }
+            }
+        }
+        
+        // If we have a current status, return it
+        if (currentStatus != null)
+        {
+            return currentStatus;
+        }
+        
+        // Otherwise, fall back to the parent implementation
+        return super.status();
+    }
+
+    /**
+     * Updates the repair job status based on the parent repair status from 
Cassandra.
+     * <p>
+     * When the parent repair status is IN_PROGRESS, this method sets the 
currentStatus
+     * to RUNNING and completes the promise
+     * <p>
+     * This approach ensures that resources are properly managed while still 
providing
+     * accurate status reporting through the {@link #status()} method.
+     *
+     * @param jobSubmissionPromise the promise to complete
+     * @param status the parent repair status from Cassandra
+     */
+    private void updateRepairJobStatus(Promise<Void> jobSubmissionPromise, 
List<String> status)
+    {
+        ParentRepairStatus parentRepairStatus = 
ParentRepairStatus.valueOf(status.get(0));
+        List<String> messages = status.subList(1, status.size());
+
+        LOGGER.info("Parent repair session {} has {} status. Messages: {}",
+                    parentRepairStatus.name().toLowerCase(),
+                    parentRepairStatus,
+                    String.join("\n", messages));
+        switch (parentRepairStatus)
+        {
+            case COMPLETED:
+                currentStatus = OperationalJobStatus.SUCCEEDED;
+                jobSubmissionPromise.tryComplete();
+                break;
+            case FAILED:
+                currentStatus = OperationalJobStatus.FAILED;
+                String reason = !messages.isEmpty() ? messages.get(0) :
+                                "Repair failed with no error message";
+                jobSubmissionPromise.tryFail(new IOException(reason));
+                break;
+            case IN_PROGRESS:
+                currentStatus = OperationalJobStatus.RUNNING;
+                jobSubmissionPromise.tryComplete();
+                break;
+            default:
+                String message = String.format("Encountered unexpected repair 
status: %s Messages: %s",
+                                               parentRepairStatus, 
String.join("\n", messages));
+                LOGGER.error(message);
+                currentStatus = OperationalJobStatus.FAILED;
+                jobSubmissionPromise.tryFail(message);
+                break;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String name()
+    {
+        return OPERATION;
+    }
+
+    private Map<String, String> generateRepairOptions(RepairPayload 
repairPayload)
+    {
+        Map<String, String> options = new HashMap<>();
+
+        List<String> tables = repairPayload.tables();
+        if (tables != null && !tables.isEmpty())
+        {
+            options.put(RepairOptions.COLUMNFAMILIES.optionName(), 
String.join(",", tables));
+        }
+
+        Boolean isPrimaryRange = repairPayload.isPrimaryRange();
+        if (isPrimaryRange != null)
+        {
+            options.put(RepairOptions.PRIMARY_RANGE.optionName(), 
String.valueOf(isPrimaryRange));
+        }
+        // TODO: Verify use-cases involving multiple DCs

Review Comment:
   Is it addressed? Presumably, it can be verified in the integration test.



##########
server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.adapters.base.RepairOptions;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.RepairJobsConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of {@link OperationalJob} to perform repair operation.
+ * <p>
+ * Cassandra repair operations are asynchronous and can run for extended 
periods. When a repair is
+ * detected to be IN_PROGRESS, this job sets its status to RUNNING and 
completes its promise.
+ * <p>
+ * The job's status can be checked at any time using the {@link #status()} 
method, which directly
+ * queries the current repair status from Cassandra, regardless of whether the 
promise has been completed.
+ * <p>
+ * This design ensures timely responses to clients while allowing accurate 
status reporting for
+ * long-running repair operations.
+ */
+public class RepairJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RepairJob.class);
+    private static final String OPERATION = "repair";
+    private static final String PREVIEW_KIND_REPAIRED = "REPAIRED";
+
+    private final RepairRequestParam repairParams;
+    private final RepairJobsConfiguration config;
+    private final TaskExecutorPool internalPool;
+    protected StorageOperations storageOperations;
+    private volatile OperationalJobStatus currentStatus;
+    private volatile int commandId = -1; // Store the command ID for status 
checks
+
+    /**
+     * Enum representing the status of a parent repair session
+     */
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
+    /**
+     * Constructor for creation of RepairJob
+     *
+     * @param taskExecutorPool    TaskExecutorPool instance (for testing)
+     * @param config              Repair job configuration
+     * @param jobId               UUID representing the Job to be created
+     * @param storageOps          Reference to the storage operations interface
+     * @param repairParams        Repair request parameters
+     */
+    public RepairJob(TaskExecutorPool taskExecutorPool,
+                     RepairJobsConfiguration config,
+                     UUID jobId,
+                     StorageOperations storageOps,
+                     RepairRequestParam repairParams)
+    {
+        super(jobId);
+        this.internalPool = taskExecutorPool;
+        this.config = config;
+        this.storageOperations = storageOps;
+        this.repairParams = repairParams;
+        this.currentStatus = OperationalJobStatus.CREATED;
+    }
+
+    /**
+     * {@inheritDoc}
+     * RepairJob allows parallel executions since multiple repairs can run 
concurrently
+     * on different tables or with different parameters.
+     */
+    @Override
+    public boolean hasConflict(@NotNull List<OperationalJob> sameOperationJobs)
+    {
+        // For now, we simply allow all repair jobs to run in parallel
+        // A more sophisticated implementation could check the repair 
parameters
+        // against currently running repairs to detect potential conflicts
+        return false;
+    }
+
+    @Override
+    protected Future<Void> executeInternal()
+    {
+        try
+        {
+            Map<String, String> options = 
generateRepairOptions(repairParams.requestPayload());
+            String keyspace = repairParams.keyspace().name();
+
+            LOGGER.info("Executing repair operation for keyspace {} jobId={}", 
keyspace, this.jobId());
+
+            try
+            {
+                commandId = storageOperations.repair(keyspace, options);
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Failed to initiate repair for keyspace {} 
jobId={}", keyspace, this.jobId(), e);
+                currentStatus = OperationalJobStatus.FAILED;
+                return Future.failedFuture(e);
+            }
+
+            if (commandId <= 0)
+            {
+                // When there are no relevant token ranges for the keyspace or 
RF is 1, the repair is inapplicable
+                LOGGER.info("Repair is not applicable for the provided options 
and keyspace '{}' jobId '{}'", keyspace, this.jobId());
+                currentStatus = OperationalJobStatus.SUCCEEDED;
+                return Future.succeededFuture();
+            }
+
+            // Create promise for job submission response - this completes 
when we have a definitive
+            // status to return to the client (either repair is running, 
completed, or failed)
+            final Promise<Void> jobSubmissionPromise = Promise.promise();
+            int maxAttempts = config.repairStatusMaxAttempts();
+            final AtomicInteger attemptCounter = new AtomicInteger(0);
+
+            // Periodic timer that checks for a valid repair status for a 
specified no. attempts to make a best-effort attempt
+            // to validate that the repair has been kicked-off before 
returning.
+            internalPool.setPeriodic(0, 
config.repairPollInterval().toIntMillis(), id -> {

Review Comment:
   The periodic task scheduling is error prone. 
   
   For example, if `getParentRepairStatus` take more than poll interval delay 
to complete, there will be 2 tasks running in parallel. The final state is then 
unpredictable. It is not the desired behavior. We want to have the kind of 
scheduling that adds delay between 2 tasks. Please look into 
`PeriodicTaskExecutor` and `PeriodicTask`



##########
server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.adapters.base.RepairOptions;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.RepairJobsConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of {@link OperationalJob} to perform repair operation.
+ * <p>
+ * Cassandra repair operations are asynchronous and can run for extended 
periods. When a repair is
+ * detected to be IN_PROGRESS, this job sets its status to RUNNING and 
completes its promise.
+ * <p>
+ * The job's status can be checked at any time using the {@link #status()} 
method, which directly
+ * queries the current repair status from Cassandra, regardless of whether the 
promise has been completed.
+ * <p>
+ * This design ensures timely responses to clients while allowing accurate 
status reporting for
+ * long-running repair operations.
+ */
+public class RepairJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RepairJob.class);
+    private static final String OPERATION = "repair";
+    private static final String PREVIEW_KIND_REPAIRED = "REPAIRED";
+
+    private final RepairRequestParam repairParams;
+    private final RepairJobsConfiguration config;
+    private final TaskExecutorPool internalPool;
+    protected StorageOperations storageOperations;
+    private volatile OperationalJobStatus currentStatus;
+    private volatile int commandId = -1; // Store the command ID for status 
checks
+
+    /**
+     * Enum representing the status of a parent repair session
+     */
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
+    /**
+     * Constructor for creation of RepairJob
+     *
+     * @param taskExecutorPool    TaskExecutorPool instance (for testing)
+     * @param config              Repair job configuration
+     * @param jobId               UUID representing the Job to be created
+     * @param storageOps          Reference to the storage operations interface
+     * @param repairParams        Repair request parameters
+     */
+    public RepairJob(TaskExecutorPool taskExecutorPool,
+                     RepairJobsConfiguration config,
+                     UUID jobId,
+                     StorageOperations storageOps,
+                     RepairRequestParam repairParams)
+    {
+        super(jobId);
+        this.internalPool = taskExecutorPool;
+        this.config = config;
+        this.storageOperations = storageOps;
+        this.repairParams = repairParams;
+        this.currentStatus = OperationalJobStatus.CREATED;
+    }
+
+    /**
+     * {@inheritDoc}
+     * RepairJob allows parallel executions since multiple repairs can run 
concurrently
+     * on different tables or with different parameters.
+     */
+    @Override
+    public boolean hasConflict(@NotNull List<OperationalJob> sameOperationJobs)
+    {
+        // For now, we simply allow all repair jobs to run in parallel
+        // A more sophisticated implementation could check the repair 
parameters
+        // against currently running repairs to detect potential conflicts
+        return false;
+    }
+
+    @Override
+    protected Future<Void> executeInternal()

Review Comment:
   nit: break down the method into several child methods. 
   
   For example, moving the polling logic into its own method. 
   
   



##########
server-common/src/main/java/org/apache/cassandra/sidecar/exceptions/KeyspaceNotFoundException.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.exceptions;
+
+/**
+ * Exception thrown when a keyspace does not exist in Cassandra.
+ * For instance, when attempting to access a keyspace that has been dropped
+ * or never existed in the connected Cassandra cluster.
+ */
+public class KeyspaceNotFoundException extends RuntimeException

Review Comment:
   👍 



##########
server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.adapters.base.RepairOptions;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.RepairJobsConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of {@link OperationalJob} to perform repair operation.
+ * <p>
+ * Cassandra repair operations are asynchronous and can run for extended 
periods. When a repair is
+ * detected to be IN_PROGRESS, this job sets its status to RUNNING and 
completes its promise.
+ * <p>
+ * The job's status can be checked at any time using the {@link #status()} 
method, which directly
+ * queries the current repair status from Cassandra, regardless of whether the 
promise has been completed.
+ * <p>
+ * This design ensures timely responses to clients while allowing accurate 
status reporting for
+ * long-running repair operations.
+ */
+public class RepairJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RepairJob.class);
+    private static final String OPERATION = "repair";
+    private static final String PREVIEW_KIND_REPAIRED = "REPAIRED";
+
+    private final RepairRequestParam repairParams;
+    private final RepairJobsConfiguration config;
+    private final TaskExecutorPool internalPool;
+    protected StorageOperations storageOperations;
+    private volatile OperationalJobStatus currentStatus;
+    private volatile int commandId = -1; // Store the command ID for status 
checks
+
+    /**
+     * Enum representing the status of a parent repair session
+     */
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
+    /**
+     * Constructor for creation of RepairJob
+     *
+     * @param taskExecutorPool    TaskExecutorPool instance (for testing)
+     * @param config              Repair job configuration
+     * @param jobId               UUID representing the Job to be created
+     * @param storageOps          Reference to the storage operations interface
+     * @param repairParams        Repair request parameters
+     */
+    public RepairJob(TaskExecutorPool taskExecutorPool,
+                     RepairJobsConfiguration config,
+                     UUID jobId,
+                     StorageOperations storageOps,
+                     RepairRequestParam repairParams)
+    {
+        super(jobId);
+        this.internalPool = taskExecutorPool;
+        this.config = config;
+        this.storageOperations = storageOps;
+        this.repairParams = repairParams;
+        this.currentStatus = OperationalJobStatus.CREATED;
+    }
+
+    /**
+     * {@inheritDoc}
+     * RepairJob allows parallel executions since multiple repairs can run 
concurrently
+     * on different tables or with different parameters.
+     */
+    @Override
+    public boolean hasConflict(@NotNull List<OperationalJob> sameOperationJobs)
+    {
+        // For now, we simply allow all repair jobs to run in parallel
+        // A more sophisticated implementation could check the repair 
parameters
+        // against currently running repairs to detect potential conflicts
+        return false;
+    }
+
+    @Override
+    protected Future<Void> executeInternal()
+    {
+        try
+        {
+            Map<String, String> options = 
generateRepairOptions(repairParams.requestPayload());
+            String keyspace = repairParams.keyspace().name();
+
+            LOGGER.info("Executing repair operation for keyspace {} jobId={}", 
keyspace, this.jobId());
+
+            try
+            {
+                commandId = storageOperations.repair(keyspace, options);
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Failed to initiate repair for keyspace {} 
jobId={}", keyspace, this.jobId(), e);
+                currentStatus = OperationalJobStatus.FAILED;
+                return Future.failedFuture(e);
+            }
+
+            if (commandId <= 0)
+            {
+                // When there are no relevant token ranges for the keyspace or 
RF is 1, the repair is inapplicable
+                LOGGER.info("Repair is not applicable for the provided options 
and keyspace '{}' jobId '{}'", keyspace, this.jobId());
+                currentStatus = OperationalJobStatus.SUCCEEDED;
+                return Future.succeededFuture();
+            }
+
+            // Create promise for job submission response - this completes 
when we have a definitive
+            // status to return to the client (either repair is running, 
completed, or failed)
+            final Promise<Void> jobSubmissionPromise = Promise.promise();
+            int maxAttempts = config.repairStatusMaxAttempts();
+            final AtomicInteger attemptCounter = new AtomicInteger(0);
+
+            // Periodic timer that checks for a valid repair status for a 
specified no. attempts to make a best-effort attempt
+            // to validate that the repair has been kicked-off before 
returning.
+            internalPool.setPeriodic(0, 
config.repairPollInterval().toIntMillis(), id -> {
+                try
+                {
+                    int currentAttempt = attemptCounter.incrementAndGet();
+                    if (currentAttempt > maxAttempts)
+                    {
+                        internalPool.cancelTimer(id);
+                        String msg = String.format("Failed to obtain repair 
status after %d attempts.", maxAttempts);
+                        LOGGER.warn(msg);
+                        // Set status to RUNNING and complete the promise to 
ensure the job is properly handled
+                        currentStatus = OperationalJobStatus.RUNNING;
+                        jobSubmissionPromise.tryComplete();
+                        return;
+                    }
+
+                    // Check the repair status
+                    List<String> status;
+                    try
+                    {
+                        status = 
storageOperations.getParentRepairStatus(commandId);
+                    }
+                    catch (Exception e)
+                    {
+                        LOGGER.warn("Failed to get repair status for cmd: {} 
(attempt {}/{})",
+                                    commandId, currentAttempt, maxAttempts, e);
+                        // Continue polling on exception
+                        return;
+                    }
+
+                    // If status is empty, continue polling
+                    if (status == null || status.isEmpty())
+                    {
+                        LOGGER.debug("No parent repair session status found 
for cmd: {} - repair may be initializing (attempt {}/{})",
+                                     commandId, currentAttempt, maxAttempts);
+                        return;
+                    }
+
+                    internalPool.cancelTimer(id);
+                    updateRepairJobStatus(jobSubmissionPromise, status);
+                }
+                catch (Exception e)
+                {
+                    LOGGER.error("Unexpected error in repair status check", e);
+                    internalPool.cancelTimer(id);
+                    currentStatus = OperationalJobStatus.FAILED;
+                    jobSubmissionPromise.tryFail(e);
+                }
+            });
+
+            return jobSubmissionPromise.future();
+        }
+        catch (Exception e)
+        {
+            // Catch any exceptions in the overall method
+            LOGGER.error("Failed to execute repair job", e);
+            currentStatus = OperationalJobStatus.FAILED;
+            return Future.failedFuture(e);
+        }
+    }
+
+
+    @Override
+    public OperationalJobStatus status()
+    {
+        // If we have a valid command ID, check the actual repair status
+        if (commandId > 0)
+        {
+            List<String> status = 
storageOperations.getParentRepairStatus(commandId);

Review Comment:
   Not to be addressed in this patch, but maybe consider shield the call behind 
some TTL to avoid querying Cassandra on every `status()` method invocation. 



##########
server/src/main/java/org/apache/cassandra/sidecar/job/RepairJob.java:
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.job;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import org.apache.cassandra.sidecar.adapters.base.RepairOptions;
+import org.apache.cassandra.sidecar.common.data.OperationalJobStatus;
+import org.apache.cassandra.sidecar.common.request.data.RepairPayload;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.RepairJobsConfiguration;
+import org.apache.cassandra.sidecar.handlers.data.RepairRequestParam;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Implementation of {@link OperationalJob} to perform repair operation.
+ * <p>
+ * Cassandra repair operations are asynchronous and can run for extended 
periods. When a repair is
+ * detected to be IN_PROGRESS, this job sets its status to RUNNING and 
completes its promise.
+ * <p>
+ * The job's status can be checked at any time using the {@link #status()} 
method, which directly
+ * queries the current repair status from Cassandra, regardless of whether the 
promise has been completed.
+ * <p>
+ * This design ensures timely responses to clients while allowing accurate 
status reporting for
+ * long-running repair operations.
+ */
+public class RepairJob extends OperationalJob
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RepairJob.class);
+    private static final String OPERATION = "repair";
+    private static final String PREVIEW_KIND_REPAIRED = "REPAIRED";
+
+    private final RepairRequestParam repairParams;
+    private final RepairJobsConfiguration config;
+    private final TaskExecutorPool internalPool;
+    protected StorageOperations storageOperations;
+    private volatile OperationalJobStatus currentStatus;
+    private volatile int commandId = -1; // Store the command ID for status 
checks
+
+    /**
+     * Enum representing the status of a parent repair session
+     */
+    public enum ParentRepairStatus
+    {
+        IN_PROGRESS, COMPLETED, FAILED
+    }
+
+    /**
+     * Constructor for creation of RepairJob
+     *
+     * @param taskExecutorPool    TaskExecutorPool instance (for testing)
+     * @param config              Repair job configuration
+     * @param jobId               UUID representing the Job to be created
+     * @param storageOps          Reference to the storage operations interface
+     * @param repairParams        Repair request parameters
+     */
+    public RepairJob(TaskExecutorPool taskExecutorPool,
+                     RepairJobsConfiguration config,
+                     UUID jobId,
+                     StorageOperations storageOps,
+                     RepairRequestParam repairParams)
+    {
+        super(jobId);
+        this.internalPool = taskExecutorPool;
+        this.config = config;
+        this.storageOperations = storageOps;
+        this.repairParams = repairParams;
+        this.currentStatus = OperationalJobStatus.CREATED;
+    }
+
+    /**
+     * {@inheritDoc}
+     * RepairJob allows parallel executions since multiple repairs can run 
concurrently
+     * on different tables or with different parameters.
+     */
+    @Override
+    public boolean hasConflict(@NotNull List<OperationalJob> sameOperationJobs)
+    {
+        // For now, we simply allow all repair jobs to run in parallel
+        // A more sophisticated implementation could check the repair 
parameters
+        // against currently running repairs to detect potential conflicts
+        return false;
+    }
+
+    @Override
+    protected Future<Void> executeInternal()
+    {
+        try
+        {
+            Map<String, String> options = 
generateRepairOptions(repairParams.requestPayload());
+            String keyspace = repairParams.keyspace().name();
+
+            LOGGER.info("Executing repair operation for keyspace {} jobId={}", 
keyspace, this.jobId());
+
+            try
+            {
+                commandId = storageOperations.repair(keyspace, options);
+            }
+            catch (Exception e)
+            {
+                LOGGER.error("Failed to initiate repair for keyspace {} 
jobId={}", keyspace, this.jobId(), e);
+                currentStatus = OperationalJobStatus.FAILED;
+                return Future.failedFuture(e);
+            }
+
+            if (commandId <= 0)
+            {
+                // When there are no relevant token ranges for the keyspace or 
RF is 1, the repair is inapplicable
+                LOGGER.info("Repair is not applicable for the provided options 
and keyspace '{}' jobId '{}'", keyspace, this.jobId());
+                currentStatus = OperationalJobStatus.SUCCEEDED;
+                return Future.succeededFuture();
+            }
+
+            // Create promise for job submission response - this completes 
when we have a definitive
+            // status to return to the client (either repair is running, 
completed, or failed)
+            final Promise<Void> jobSubmissionPromise = Promise.promise();
+            int maxAttempts = config.repairStatusMaxAttempts();
+            final AtomicInteger attemptCounter = new AtomicInteger(0);
+
+            // Periodic timer that checks for a valid repair status for a 
specified no. attempts to make a best-effort attempt
+            // to validate that the repair has been kicked-off before 
returning.
+            internalPool.setPeriodic(0, 
config.repairPollInterval().toIntMillis(), id -> {
+                try
+                {
+                    int currentAttempt = attemptCounter.incrementAndGet();
+                    if (currentAttempt > maxAttempts)
+                    {
+                        internalPool.cancelTimer(id);
+                        String msg = String.format("Failed to obtain repair 
status after %d attempts.", maxAttempts);
+                        LOGGER.warn(msg);
+                        // Set status to RUNNING and complete the promise to 
ensure the job is properly handled
+                        currentStatus = OperationalJobStatus.RUNNING;
+                        jobSubmissionPromise.tryComplete();
+                        return;
+                    }
+
+                    // Check the repair status
+                    List<String> status;
+                    try
+                    {
+                        status = 
storageOperations.getParentRepairStatus(commandId);
+                    }
+                    catch (Exception e)
+                    {
+                        LOGGER.warn("Failed to get repair status for cmd: {} 
(attempt {}/{})",
+                                    commandId, currentAttempt, maxAttempts, e);
+                        // Continue polling on exception
+                        return;
+                    }
+
+                    // If status is empty, continue polling
+                    if (status == null || status.isEmpty())
+                    {
+                        LOGGER.debug("No parent repair session status found 
for cmd: {} - repair may be initializing (attempt {}/{})",
+                                     commandId, currentAttempt, maxAttempts);
+                        return;
+                    }
+
+                    internalPool.cancelTimer(id);
+                    updateRepairJobStatus(jobSubmissionPromise, status);
+                }
+                catch (Exception e)
+                {
+                    LOGGER.error("Unexpected error in repair status check", e);
+                    internalPool.cancelTimer(id);
+                    currentStatus = OperationalJobStatus.FAILED;
+                    jobSubmissionPromise.tryFail(e);
+                }
+            });
+
+            return jobSubmissionPromise.future();
+        }
+        catch (Exception e)
+        {
+            // Catch any exceptions in the overall method
+            LOGGER.error("Failed to execute repair job", e);
+            currentStatus = OperationalJobStatus.FAILED;
+            return Future.failedFuture(e);
+        }
+    }
+
+
+    @Override
+    public OperationalJobStatus status()
+    {
+        // If we have a valid command ID, check the actual repair status
+        if (commandId > 0)
+        {
+            List<String> status = 
storageOperations.getParentRepairStatus(commandId);
+            if (status != null && !status.isEmpty())
+            {
+                try
+                {
+                    ParentRepairStatus parentRepairStatus = 
ParentRepairStatus.valueOf(status.get(0));
+                    switch (parentRepairStatus)
+                    {
+                        case COMPLETED:
+                            return OperationalJobStatus.SUCCEEDED;
+                        case FAILED:
+                            return OperationalJobStatus.FAILED;
+                        case IN_PROGRESS:
+                            return OperationalJobStatus.RUNNING;

Review Comment:
   Why not update `currentStatus`? And this method `status` and 
`updateRepairJobStatus` share common logic, which reads a bit annoying. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to